Spaces:
Runtime error
Runtime error
File size: 1,232 Bytes
86b135a fc9ff85 86b135a fc9ff85 86b135a f24e569 fc9ff85 f24e569 86b135a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from db import db # assumes db is an instance of AsyncIOMotorDatabase
from bson import ObjectId
from bson.errors import InvalidId
app = FastAPI()
class Item(BaseModel):
name: str
description: str
@app.post("/items/")
async def create_item(item: Item):
result = await db["items"].insert_one(item.dict())
return {"id": str(result.inserted_id), "message": "Item created"}
@app.get("/items/")
async def read_items():
items = []
cursor = db["items"].find() # This returns an AsyncIOMotorCursor
async for document in cursor:
document["id"] = str(document["_id"])
del document["_id"]
items.append(document)
return items
@app.get("/")
def index():
return {"message": "Hello World"}
@app.get("/items/{item_id}")
async def read_item(item_id: str):
try:
oid = ObjectId(item_id)
except InvalidId:
raise HTTPException(status_code=400, detail="Invalid item ID format")
item = await db["items"].find_one({"_id": oid})
if not item:
raise HTTPException(status_code=404, detail="Item not found")
item["id"] = str(item["_id"])
del item["_id"]
return item
|