Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from typing import Optional, List | |
| from pydantic import BaseModel | |
| import csv | |
| import os | |
| app = FastAPI() | |
| # 1 Define blueprint | |
| class Customer(BaseModel): | |
| id:int | |
| name: str | |
| email: str | |
| phone: Optional[str]=None | |
| address: Optional[str]=None | |
| # 2 CSV Database Configuration | |
| CSV_FILE = "/tmp/customers.csv" if os.path.exists("/tmp") else "customers.csv" | |
| CSV_HEADERS = ["id", "name", "email", "phone", "address"] | |
| def initialize_csv(): | |
| """Initialize CSV file with headers if it doesn't exist""" | |
| if not os.path.exists(CSV_FILE): | |
| with open(CSV_FILE, 'w', newline='', encoding='utf-8') as file: | |
| writer = csv.writer(file) | |
| writer.writerow(CSV_HEADERS) | |
| def read_customers_from_csv() -> List[Customer]: | |
| """Read all customers from CSV file""" | |
| customers = [] | |
| if os.path.exists(CSV_FILE): | |
| with open(CSV_FILE, 'r', newline='', encoding='utf-8') as file: | |
| reader = csv.DictReader(file) | |
| for row in reader: | |
| customers.append(Customer( | |
| id=int(row['id']), | |
| name=row['name'], | |
| email=row['email'], | |
| phone=row['phone'] if row['phone'] else None, | |
| address=row['address'] if row['address'] else None | |
| )) | |
| return customers | |
| def write_customers_to_csv(customers: List[Customer]): | |
| """Write all customers to CSV file""" | |
| with open(CSV_FILE, 'w', newline='', encoding='utf-8') as file: | |
| writer = csv.writer(file) | |
| writer.writerow(CSV_HEADERS) | |
| for customer in customers: | |
| writer.writerow([ | |
| customer.id, | |
| customer.name, | |
| customer.email, | |
| customer.phone or "", | |
| customer.address or "" | |
| ]) | |
| # Initialize CSV file on startup | |
| initialize_csv() | |
| #Create | |
| def create_customer(customer: Customer): | |
| customers = read_customers_from_csv() | |
| customers.append(customer) | |
| write_customers_to_csv(customers) | |
| return customer | |
| #Read | |
| def get_customer(): | |
| return read_customers_from_csv() | |
| #Update | |
| def update_customer(id:int, customer:Customer): | |
| customers = read_customers_from_csv() | |
| for i, existing_customer in enumerate(customers): | |
| if existing_customer.id == id: | |
| customers[i] = customer | |
| write_customers_to_csv(customers) | |
| return customer | |
| raise HTTPException(status_code=404, detail="customer not found") | |
| #Delete | |
| def delete_customer(id:int): | |
| customers = read_customers_from_csv() | |
| for i, customer_obj in enumerate(customers): | |
| if customer_obj.id == id: | |
| deleted_customer = customers.pop(i) | |
| write_customers_to_csv(customers) | |
| return deleted_customer | |
| raise HTTPException(status_code=404, detail="customer not found") | |