embeddinggemma_300m_fastapi / test_router.py
WildOjisan's picture
.
1d27b68
# crud_router.py
from fastapi import APIRouter, Depends, Query, status
from pydantic import BaseModel
from typing import Dict, Any, List, Optional
import asyncpg
from datetime import datetime
# database_conn.py์— ์ •์˜๋œ get_db_connection ํ•จ์ˆ˜๋ฅผ ์ž„ํฌํŠธํ•ฉ๋‹ˆ๋‹ค.
from database_conn import get_db_connection
# ----------------------------------------------------
# Pydantic ๋ชจ๋ธ ์ •์˜
# ----------------------------------------------------
class ItemCreate(BaseModel):
name: str
price: float
class ItemUpdate(BaseModel):
name: str | None = None
price: float | None = None
class ItemInDB(BaseModel):
id: int
name: str
price: float
created_at: datetime
# API ์‘๋‹ต ํ‘œ์ค€ํ™” ๋ชจ๋ธ
class APIResponse(BaseModel):
success: bool
msg: str = ""
data: Any = None
# ----------------------------------------------------
# APIRouter ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ
# ----------------------------------------------------
router = APIRouter(
tags=["crud_t_test1"],
)
# ----------------------------------------------------
# 1. CREATE (๋ฐ์ดํ„ฐ ์ƒ์„ฑ) - POST /crud/t_test1/
# ----------------------------------------------------
@router.post("/", response_model=APIResponse)
async def create_t_test1_item(
item: ItemCreate,
conn: asyncpg.Connection = Depends(get_db_connection)
):
"""
์ƒˆ๋กœ์šด ์•„์ดํ…œ์„ 't_test1' ํ…Œ์ด๋ธ”์— ์‚ฝ์ž…ํ•ฉ๋‹ˆ๋‹ค (INSERT).
"""
response_data = {"success": True, "msg": "", "data": None}
query = """
INSERT INTO t_test1 (name, price)
VALUES ($1, $2)
RETURNING id, name, price, created_at;
"""
try:
record = await conn.fetchrow(query, item.name, item.price)
if record is None:
response_data["success"] = False
response_data["msg"] = "Item creation failed unexpectedly: No record returned."
return response_data
response_data["msg"] = "Item created successfully."
response_data["data"] = ItemInDB(**dict(record))
return response_data
except Exception as e:
response_data["success"] = False
response_data["msg"] = f"Database INSERT error: {e!r}"
return response_data
# ----------------------------------------------------
# 2. READ (์ „์ฒด ์กฐํšŒ ๋ฐ ๋‹จ์ผ ์กฐํšŒ) - GET /crud/t_test1/?item_id={id} ๋˜๋Š” /crud/t_test1/
# ----------------------------------------------------
@router.get("/", response_model=APIResponse)
async def read_t_test1_items(
# item_id๊ฐ€ ์ฟผ๋ฆฌ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ฒ˜๋ฆฌ๋ฉ๋‹ˆ๋‹ค.
item_id: Optional[int] = None,
conn: asyncpg.Connection = Depends(get_db_connection)
):
"""
item_id๊ฐ€ ์žˆ์œผ๋ฉด ๋‹จ์ผ ์•„์ดํ…œ์„, ์—†์œผ๋ฉด ์ „์ฒด ๋ฆฌ์ŠคํŠธ๋ฅผ ์กฐํšŒํ•ฉ๋‹ˆ๋‹ค (SELECT).
"""
response_data = {"success": True, "msg": "", "data": None}
try:
if item_id:
# ๋‹จ์ผ ์กฐํšŒ (WHERE ID)
query = "SELECT id, name, price, created_at FROM t_test1 WHERE id = $1;"
record = await conn.fetchrow(query, item_id)
if record is None:
response_data["success"] = False
response_data["msg"] = f"Read failed: Item with ID {item_id} not found."
return response_data
response_data["msg"] = "Item read successfully."
response_data["data"] = ItemInDB(**dict(record))
else:
# ์ „์ฒด ์กฐํšŒ (SELECT *)
query = "SELECT id, name, price, created_at FROM t_test1 ORDER BY id;"
records = await conn.fetch(query)
response_data["data"] = [ItemInDB(**dict(record)) for record in records]
response_data["msg"] = f"Found {len(records)} items."
return response_data
except Exception as e:
response_data["success"] = False
response_data["msg"] = f"Database SELECT error: {e!r}"
return response_data
# ----------------------------------------------------
# 3. UPDATE (๋ฐ์ดํ„ฐ ์—…๋ฐ์ดํŠธ) - PUT /crud/t_test1/?item_id={id}
# ----------------------------------------------------
@router.put("/", response_model=APIResponse)
async def update_t_test1_item(
# item_id๊ฐ€ ํ•„์ˆ˜ ์ฟผ๋ฆฌ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ฒ˜๋ฆฌ๋ฉ๋‹ˆ๋‹ค.
item_id: int,
item: ItemUpdate,
conn: asyncpg.Connection = Depends(get_db_connection)
):
"""
ํŠน์ • ID์˜ ์•„์ดํ…œ ์ •๋ณด๋ฅผ ์—…๋ฐ์ดํŠธํ•ฉ๋‹ˆ๋‹ค (UPDATE).
"""
response_data = {"success": True, "msg": "", "data": None}
# 1. ํ˜„์žฌ ์•„์ดํ…œ ์กด์žฌ ์—ฌ๋ถ€ ํ™•์ธ (์—…๋ฐ์ดํŠธ ์ „ ํ•„์ˆ˜)
select_query = "SELECT id FROM t_test1 WHERE id = $1;"
current_record = await conn.fetchrow(select_query, item_id)
if current_record is None:
response_data["success"] = False
response_data["msg"] = f"Update failed: Item with ID {item_id} not found."
return response_data
# 2. ๋™์  SQL ์ฟผ๋ฆฌ ์ƒ์„ฑ
updates = []
params = []
param_count = 1
if item.name is not None:
updates.append(f"name = ${param_count}")
params.append(item.name)
param_count += 1
if item.price is not None:
updates.append(f"price = ${param_count}")
params.append(item.price)
param_count += 1
if not updates:
response_data["msg"] = "No updates provided. Returning current item data."
# ์—…๋ฐ์ดํŠธํ•  ๋‚ด์šฉ์ด ์—†๋Š” ๊ฒฝ์šฐ, ํ˜„์žฌ ์•„์ดํ…œ์„ ์กฐํšŒํ•˜์—ฌ ๋ฐ˜ํ™˜
return await read_t_test1_items(item_id=item_id, conn=conn)
# 3. ์ตœ์ข… UPDATE ์ฟผ๋ฆฌ ์‹คํ–‰
set_clause = ", ".join(updates)
where_param_index = param_count
update_query = f"""
UPDATE t_test1
SET {set_clause}
WHERE id = ${where_param_index}
RETURNING id, name, price, created_at;
"""
params.append(item_id) # WHERE ์ ˆ์— ์‚ฌ์šฉํ•  ID ์ถ”๊ฐ€
try:
updated_record = await conn.fetchrow(update_query, *params)
if updated_record is None:
response_data["success"] = False
response_data["msg"] = "Update failed: No record was modified."
return response_data
response_data["msg"] = "Item updated successfully."
response_data["data"] = ItemInDB(**dict(updated_record))
return response_data
except Exception as e:
response_data["success"] = False
response_data["msg"] = f"Database UPDATE error: {e!r}"
return response_data
# ----------------------------------------------------
# 4. DELETE (๋ฐ์ดํ„ฐ ์‚ญ์ œ) - DELETE /crud/t_test1/?item_id={id}
# ----------------------------------------------------
@router.delete("/", response_model=APIResponse)
async def delete_t_test1_item(
# item_id๊ฐ€ ํ•„์ˆ˜ ์ฟผ๋ฆฌ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ฒ˜๋ฆฌ๋ฉ๋‹ˆ๋‹ค.
item_id: int,
conn: asyncpg.Connection = Depends(get_db_connection)
):
"""
ํŠน์ • ID์˜ ์•„์ดํ…œ์„ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค (DELETE).
"""
response_data = {"success": True, "msg": "", "data": None}
query = "DELETE FROM t_test1 WHERE id = $1;"
try:
command_tag = await conn.execute(query, item_id)
deleted_rows = int(command_tag.split()[-1])
if deleted_rows == 0:
response_data["success"] = False
response_data["msg"] = f"Delete failed: Item with ID {item_id} not found."
return response_data
response_data["msg"] = f"Item with ID {item_id} deleted successfully."
return response_data
except Exception as e:
response_data["success"] = False
response_data["msg"] = f"Database DELETE error: {e!r}"
return response_data