Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -1,6 +1,8 @@
|
|
| 1 |
from fastapi import FastAPI, HTTPException
|
| 2 |
-
from typing import Optional, List
|
| 3 |
from pydantic import BaseModel
|
|
|
|
|
|
|
| 4 |
|
| 5 |
app = FastAPI()
|
| 6 |
# 1 Define blueprint
|
|
@@ -11,33 +13,80 @@ class Customer(BaseModel):
|
|
| 11 |
phone: Optional[str]=None
|
| 12 |
address: Optional[str]=None
|
| 13 |
|
| 14 |
-
# 2
|
| 15 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 16 |
#Create
|
| 17 |
@app.post("/Customer",response_model=Customer)
|
| 18 |
def create_customer(customer: Customer):
|
| 19 |
-
|
|
|
|
|
|
|
| 20 |
return customer
|
| 21 |
|
| 22 |
#Read
|
| 23 |
@app.get("/Customer",response_model=List[Customer])
|
| 24 |
def get_customer():
|
| 25 |
-
return
|
| 26 |
|
| 27 |
#Update
|
| 28 |
@app.put("/Customer/{id}",response_model=Customer)
|
| 29 |
def update_customer(id:int, customer:Customer):
|
| 30 |
-
|
|
|
|
| 31 |
if existing_customer.id == id:
|
| 32 |
-
|
|
|
|
| 33 |
return customer
|
| 34 |
-
raise HTTPException(status_code
|
| 35 |
|
| 36 |
-
#Delete
|
| 37 |
@app.delete("/Customer/{id}",response_model=Customer)
|
| 38 |
def delete_customer(id:int):
|
| 39 |
-
|
| 40 |
-
|
| 41 |
-
|
|
|
|
|
|
|
| 42 |
return deleted_customer
|
| 43 |
-
raise HTTPException(status_code
|
|
|
|
| 1 |
from fastapi import FastAPI, HTTPException
|
| 2 |
+
from typing import Optional, List
|
| 3 |
from pydantic import BaseModel
|
| 4 |
+
import csv
|
| 5 |
+
import os
|
| 6 |
|
| 7 |
app = FastAPI()
|
| 8 |
# 1 Define blueprint
|
|
|
|
| 13 |
phone: Optional[str]=None
|
| 14 |
address: Optional[str]=None
|
| 15 |
|
| 16 |
+
# 2 CSV Database Configuration
|
| 17 |
+
CSV_FILE = "/tmp/customers.csv" if os.path.exists("/tmp") else "customers.csv"
|
| 18 |
+
CSV_HEADERS = ["id", "name", "email", "phone", "address"]
|
| 19 |
+
|
| 20 |
+
def initialize_csv():
|
| 21 |
+
"""Initialize CSV file with headers if it doesn't exist"""
|
| 22 |
+
if not os.path.exists(CSV_FILE):
|
| 23 |
+
with open(CSV_FILE, 'w', newline='', encoding='utf-8') as file:
|
| 24 |
+
writer = csv.writer(file)
|
| 25 |
+
writer.writerow(CSV_HEADERS)
|
| 26 |
+
|
| 27 |
+
def read_customers_from_csv() -> List[Customer]:
|
| 28 |
+
"""Read all customers from CSV file"""
|
| 29 |
+
customers = []
|
| 30 |
+
if os.path.exists(CSV_FILE):
|
| 31 |
+
with open(CSV_FILE, 'r', newline='', encoding='utf-8') as file:
|
| 32 |
+
reader = csv.DictReader(file)
|
| 33 |
+
for row in reader:
|
| 34 |
+
customers.append(Customer(
|
| 35 |
+
id=int(row['id']),
|
| 36 |
+
name=row['name'],
|
| 37 |
+
email=row['email'],
|
| 38 |
+
phone=row['phone'] if row['phone'] else None,
|
| 39 |
+
address=row['address'] if row['address'] else None
|
| 40 |
+
))
|
| 41 |
+
return customers
|
| 42 |
+
|
| 43 |
+
def write_customers_to_csv(customers: List[Customer]):
|
| 44 |
+
"""Write all customers to CSV file"""
|
| 45 |
+
with open(CSV_FILE, 'w', newline='', encoding='utf-8') as file:
|
| 46 |
+
writer = csv.writer(file)
|
| 47 |
+
writer.writerow(CSV_HEADERS)
|
| 48 |
+
for customer in customers:
|
| 49 |
+
writer.writerow([
|
| 50 |
+
customer.id,
|
| 51 |
+
customer.name,
|
| 52 |
+
customer.email,
|
| 53 |
+
customer.phone or "",
|
| 54 |
+
customer.address or ""
|
| 55 |
+
])
|
| 56 |
+
|
| 57 |
+
# Initialize CSV file on startup
|
| 58 |
+
initialize_csv()
|
| 59 |
#Create
|
| 60 |
@app.post("/Customer",response_model=Customer)
|
| 61 |
def create_customer(customer: Customer):
|
| 62 |
+
customers = read_customers_from_csv()
|
| 63 |
+
customers.append(customer)
|
| 64 |
+
write_customers_to_csv(customers)
|
| 65 |
return customer
|
| 66 |
|
| 67 |
#Read
|
| 68 |
@app.get("/Customer",response_model=List[Customer])
|
| 69 |
def get_customer():
|
| 70 |
+
return read_customers_from_csv()
|
| 71 |
|
| 72 |
#Update
|
| 73 |
@app.put("/Customer/{id}",response_model=Customer)
|
| 74 |
def update_customer(id:int, customer:Customer):
|
| 75 |
+
customers = read_customers_from_csv()
|
| 76 |
+
for i, existing_customer in enumerate(customers):
|
| 77 |
if existing_customer.id == id:
|
| 78 |
+
customers[i] = customer
|
| 79 |
+
write_customers_to_csv(customers)
|
| 80 |
return customer
|
| 81 |
+
raise HTTPException(status_code=404, detail="customer not found")
|
| 82 |
|
| 83 |
+
#Delete
|
| 84 |
@app.delete("/Customer/{id}",response_model=Customer)
|
| 85 |
def delete_customer(id:int):
|
| 86 |
+
customers = read_customers_from_csv()
|
| 87 |
+
for i, customer_obj in enumerate(customers):
|
| 88 |
+
if customer_obj.id == id:
|
| 89 |
+
deleted_customer = customers.pop(i)
|
| 90 |
+
write_customers_to_csv(customers)
|
| 91 |
return deleted_customer
|
| 92 |
+
raise HTTPException(status_code=404, detail="customer not found")
|