Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from typing import List | |
| import logging | |
| from app.utils.jwt import get_current_customer_id | |
| from app.models.address_model import AddressModel | |
| from app.schemas.address_schema import ( | |
| AddressCreateRequest, AddressUpdateRequest, AddressResponse, | |
| AddressListResponse, SetDefaultAddressRequest, AddressOperationResponse | |
| ) | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| async def get_user_addresses(current_customer_id: str = Depends(get_current_customer_id)): | |
| """ | |
| Get all delivery addresses for the current user. | |
| This endpoint is JWT protected and requires a valid Bearer token. | |
| """ | |
| try: | |
| logger.info(f"Get addresses request for user: {current_customer_id}") | |
| addresses = await AddressModel.get_user_addresses(current_customer_id) | |
| address_responses = [] | |
| for addr in addresses: | |
| address_responses.append(AddressResponse( | |
| address_id=addr["address_id"], # Use the new address_id field | |
| address_type=addr["address_type"], | |
| address_line_1=addr["address_line_1"], | |
| address_line_2=addr.get("address_line_2", ""), | |
| city=addr["city"], | |
| state=addr["state"], | |
| postal_code=addr["postal_code"], | |
| country=addr.get("country", "India"), | |
| landmark=addr.get("landmark", ""), | |
| is_default=addr.get("is_default", False), | |
| created_at=addr.get("created_at"), | |
| updated_at=addr.get("updated_at") | |
| )) | |
| if not address_responses: | |
| return AddressListResponse( | |
| success=False, | |
| message="Failed to retrieve address" | |
| ) | |
| else: | |
| return AddressListResponse( | |
| success=True, | |
| message="Addresses retrieved successfully", | |
| addresses=address_responses, | |
| total_count=len(address_responses) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error getting addresses for user {current_customer_id}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to retrieve addresses" | |
| ) | |
| async def create_address( | |
| address_data: AddressCreateRequest, | |
| current_customer_id: str = Depends(get_current_customer_id) | |
| ): | |
| """ | |
| Create a new delivery address for the current user. | |
| """ | |
| try: | |
| logger.info(f"Create address request for user: {current_customer_id}") | |
| # Check if user already has 5 addresses (limit) | |
| existing_addresses = await AddressModel.get_user_addresses(current_customer_id) | |
| if len(existing_addresses) >= 5: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Maximum of 5 addresses allowed per user" | |
| ) | |
| # If this is the first address, make it default | |
| is_default = len(existing_addresses) == 0 or address_data.is_default | |
| address_id = await AddressModel.create_address(current_customer_id, address_data.dict()) | |
| if address_id: | |
| # Get the created address | |
| created_address = await AddressModel.get_address_by_id(current_customer_id,address_id) | |
| address_response = AddressResponse( | |
| address_id=created_address["address_id"], # Use the new address_id field | |
| address_type=created_address["address_type"], | |
| address_line_1=created_address["address_line_1"], | |
| address_line_2=created_address.get("address_line_2", ""), | |
| city=created_address["city"], | |
| state=created_address["state"], | |
| postal_code=created_address["postal_code"], | |
| country=created_address.get("country", "India"), | |
| is_default=created_address.get("is_default", False), | |
| landmark=created_address.get("landmark", ""), | |
| created_at=created_address.get("created_at"), | |
| updated_at=created_address.get("updated_at") | |
| ) | |
| return AddressOperationResponse( | |
| success=True, | |
| message="Address created successfully", | |
| address=address_response | |
| ) | |
| else: | |
| return AddressOperationResponse( | |
| success=False, | |
| message="Failed to create address" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error creating address for user {current_customer_id}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to create address" | |
| ) | |
| async def update_address( | |
| address_id: str, | |
| address_data: AddressUpdateRequest, | |
| current_customer_id: str = Depends(get_current_customer_id) | |
| ): | |
| """ | |
| Update an existing delivery address. | |
| """ | |
| try: | |
| logger.info(f"Update address request for user: {current_customer_id}, address: {address_id}") | |
| # Check if address exists and belongs to user | |
| existing_address = await AddressModel.get_address_by_id(current_customer_id,address_id) | |
| if not existing_address: | |
| raise HTTPException(status_code=404, detail="Address not found") | |
| if existing_address["customer_id"] != current_customer_id: | |
| raise HTTPException(status_code=403, detail="Access denied") | |
| # Prepare update fields | |
| update_fields = {} | |
| if address_data.address_type is not None: | |
| update_fields["address_type"] = address_data.address_type | |
| if address_data.address_line_1 is not None: | |
| update_fields["address_line_1"] = address_data.address_line_1 | |
| if address_data.address_line_2 is not None: | |
| update_fields["address_line_2"] = address_data.address_line_2 | |
| if address_data.city is not None: | |
| update_fields["city"] = address_data.city | |
| if address_data.state is not None: | |
| update_fields["state"] = address_data.state | |
| if address_data.is_default is not None: | |
| update_fields['is_default']=address_data.is_default | |
| if address_data.landmark is not None: | |
| update_fields["landmark"] = address_data.landmark | |
| if address_data.postal_code is not None: | |
| update_fields["postal_code"] = address_data.postal_code | |
| if address_data.country is not None: | |
| update_fields["country"] = address_data.country | |
| if not update_fields: | |
| raise HTTPException(status_code=400, detail="No fields to update") | |
| success = await AddressModel.update_address(current_customer_id,address_id, update_fields) | |
| if success: | |
| # Get updated address | |
| updated_address = await AddressModel.get_address_by_id(current_customer_id,address_id) | |
| address_response = AddressResponse( | |
| address_id=updated_address["address_id"], # Use the new address_id field | |
| address_type=updated_address["address_type"], | |
| address_line_1=updated_address["address_line_1"], | |
| address_line_2=updated_address.get("address_line_2", ""), | |
| city=updated_address["city"], | |
| state=updated_address["state"], | |
| postal_code=updated_address["postal_code"], | |
| country=updated_address.get("country", "India"), | |
| landmark=updated_address.get("landmark", ""), | |
| is_default=updated_address.get("is_default", False), | |
| created_at=updated_address.get("created_at"), | |
| updated_at=updated_address.get("updated_at") | |
| ) | |
| return AddressOperationResponse( | |
| success=True, | |
| message="Address updated successfully", | |
| address=address_response | |
| ) | |
| else: | |
| return AddressOperationResponse( | |
| success=False, | |
| message="Failed to update address" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error updating address {address_id} for user {current_customer_id}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to update address" | |
| ) | |
| async def delete_address( | |
| address_id: str, | |
| current_customer_id: str = Depends(get_current_customer_id) | |
| ): | |
| """ | |
| Delete a delivery address. | |
| """ | |
| try: | |
| logger.info(f"Delete address request for user: {current_customer_id}, address: {address_id}") | |
| # Check if address exists and belongs to user | |
| existing_address = await AddressModel.get_address_by_id(current_customer_id,address_id) | |
| if not existing_address: | |
| raise HTTPException(status_code=404, detail="Address not found") | |
| if existing_address["customer_id"] != current_customer_id: | |
| raise HTTPException(status_code=403, detail="Access denied") | |
| # Check if this is the default address | |
| if existing_address.get("is_default", False): | |
| # Get other addresses to potentially set a new default | |
| user_addresses = await AddressModel.get_user_addresses(current_customer_id) | |
| # Compare by new domain id field 'address_id' | |
| other_addresses = [addr for addr in user_addresses if addr.get("address_id") != address_id] | |
| if other_addresses: | |
| # Set the first other address as default | |
| await AddressModel.set_default_address(current_customer_id, other_addresses[0]["address_id"]) | |
| success = await AddressModel.delete_address(current_customer_id,address_id) | |
| if success: | |
| return AddressOperationResponse( | |
| success=True, | |
| message="Address deleted successfully" | |
| ) | |
| else: | |
| return AddressOperationResponse( | |
| success=False, | |
| message="Failed to delete address" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error deleting address {address_id} for user {current_customer_id}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to delete address" | |
| ) | |
| async def set_default_address( | |
| request: SetDefaultAddressRequest, | |
| current_customer_id: str = Depends(get_current_customer_id) | |
| ): | |
| """ | |
| Set an address as the default delivery address. | |
| """ | |
| try: | |
| logger.info(f"Set default address request for user: {current_customer_id}, address: {request.address_id}") | |
| # Check if address exists and belongs to user | |
| existing_address = await AddressModel.get_address_by_id(current_customer_id,request.address_id) | |
| if not existing_address: | |
| raise HTTPException(status_code=404, detail="Address not found") | |
| if existing_address["customer_id"] != current_customer_id: | |
| raise HTTPException(status_code=403, detail="Access denied") | |
| success = await AddressModel.set_default_address(current_customer_id, request.address_id) | |
| if success: | |
| # Get updated address | |
| updated_address = await AddressModel.get_address_by_id(current_customer_id,request.address_id) | |
| address_response = AddressResponse( | |
| address_id=request.address_id, | |
| address_type=updated_address["address_type"], | |
| contact_name=updated_address.get("contact_name", ""), | |
| contact_phone=updated_address.get("contact_phone", ""), | |
| address_line_1=updated_address["address_line_1"], | |
| address_line_2=updated_address.get("address_line_2", ""), | |
| city=updated_address["city"], | |
| state=updated_address["state"], | |
| postal_code=updated_address["postal_code"], | |
| country=updated_address.get("country", "India"), | |
| landmark=updated_address.get("landmark", ""), | |
| is_default=updated_address.get("is_default", False), | |
| created_at=updated_address.get("created_at"), | |
| updated_at=updated_address.get("updated_at") | |
| ) | |
| return AddressOperationResponse( | |
| success=True, | |
| message="Default address set successfully", | |
| address=address_response | |
| ) | |
| else: | |
| return AddressOperationResponse( | |
| success=False, | |
| message="Failed to set default address" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error setting default address for user {current_customer_id}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to set default address" | |
| ) |