|
|
|
|
|
|
|
|
import asyncpg |
|
|
from typing import AsyncGenerator |
|
|
|
|
|
|
|
|
DB_USER = "neondb_owner" |
|
|
DB_PASSWORD = "npg_rFA9DgloybO0" |
|
|
DB_HOST = "ep-rapid-truth-ad2e6rnq-pooler.c-2.us-east-1.aws.neon.tech" |
|
|
DB_NAME = "neondb" |
|
|
DB_PORT = 5432 |
|
|
DB_SSL = 'require' |
|
|
|
|
|
|
|
|
pool: asyncpg.Pool = None |
|
|
|
|
|
|
|
|
async def connect_to_db(): |
|
|
global pool |
|
|
print("DB μ°κ²° ν μ΄κΈ°ν μ€...") |
|
|
try: |
|
|
pool = await asyncpg.create_pool( |
|
|
user=DB_USER, |
|
|
password=DB_PASSWORD, |
|
|
host=DB_HOST, |
|
|
database=DB_NAME, |
|
|
port=DB_PORT, |
|
|
ssl=DB_SSL |
|
|
) |
|
|
print("DB μ°κ²° μ±κ³΅.") |
|
|
except Exception as e: |
|
|
print(f"DB μ°κ²° μ€ν¨: {e}") |
|
|
raise e |
|
|
|
|
|
|
|
|
async def close_db_connection(): |
|
|
global pool |
|
|
if pool: |
|
|
print("DB μ°κ²° ν μ’
λ£ μ€...") |
|
|
await pool.close() |
|
|
print("DB μ°κ²° ν μ’
λ£ μλ£.") |
|
|
|
|
|
|
|
|
async def get_db_connection() -> AsyncGenerator[asyncpg.Connection, None]: |
|
|
"""DB μ°κ²° νμμ 컀λ₯μ
μ κ°μ Έμ μλν¬μΈνΈμ μ 곡ν©λλ€.""" |
|
|
global pool |
|
|
if not pool: |
|
|
raise ConnectionError("DB μ°κ²° νμ΄ μ΄κΈ°νλμ§ μμμ΅λλ€.") |
|
|
|
|
|
|
|
|
async with pool.acquire() as connection: |
|
|
yield connection |