| | import aiomysql |
| |
|
| | from __init__ import mysql_option |
| |
|
| |
|
| | class AIOMYSQL: |
| | def __init__(self) -> None: |
| | self.pool = None |
| |
|
| | async def init_pool(self): |
| | try: |
| | __pool = await aiomysql.create_pool( |
| | host=mysql_option["HOST"], |
| | port=mysql_option["PORT"], |
| | user=mysql_option["USERNAME"], |
| | password=mysql_option["PASSWORD"], |
| | db=mysql_option["DATABASE"], |
| | charset="utf8", |
| | autocommit=False, |
| | minsize=5, |
| | maxsize=10, |
| | cursorclass=aiomysql.DictCursor, |
| | ) |
| | return __pool |
| | except: |
| | raise ("aiomysql create_pool error") |
| |
|
| | async def get_cursor(self): |
| | conn = await self.pool.acquire() |
| | cur = await conn.cursor() |
| | return conn, cur |
| |
|
| | async def query(self, sql, param=None): |
| | conn, cur = await self.get_cursor() |
| | try: |
| | await cur.execute(sql, param) |
| | except: |
| | await cur.rollback() |
| | print("aiomysql query error") |
| | else: |
| | return await cur.fetchall() |
| | finally: |
| | if cur: |
| | await cur.close() |
| | |
| | await self.pool.release(conn) |
| |
|
| | async def update(self, sql, param=None): |
| | conn, cur = await self.get_cursor() |
| | try: |
| | await cur.execute(sql, param) |
| | except: |
| | await cur.rollback() |
| | print("aiomysql query error") |
| | else: |
| | await conn.commit() |
| | finally: |
| | if cur: |
| | await cur.close() |
| | |
| | await self.pool.release(conn) |
| |
|
| |
|
| | async def get_aiomysql_instance(): |
| | instance = AIOMYSQL() |
| | instance.pool = await instance.init_pool() |
| | return instance |
| |
|