Test / src /db_utils /sql_utils.py
Архипов Дмитрий
test
565e754
from typing import Hashable, Optional, Literal
import pandas as pd
from sqlalchemy import text
from src.config import sql_client
def sql_drop(table: str):
try:
with sql_client.begin() as conn:
conn.execute(text(f"drop table if exists {table};"))
except Exception as e:
print("Ошибка:", e)
def sql_dump_df(
df: pd.DataFrame,
table: str,
if_exists: Literal["replace", "append"] = "append",
) -> Optional[int]:
return df.to_sql(table, sql_client, if_exists=if_exists, index=False)
def sql_get_table(table: str) -> pd.DataFrame:
with sql_client.connect() as conn:
df = pd.read_sql(f"""select * from {table}""", conn)
return df
def sql_get_by_id(id_: Hashable) -> dict:
with sql_client.connect() as conn:
row = (
conn.execute(
text("SELECT * FROM posts WHERE ctid = :id"),
{"id": id_},
)
.mappings()
.first()
)
return row
def sql_get_by_ids(ids_: Hashable) -> list[dict]:
with sql_client.connect() as conn:
rows = (
conn.execute(
text("SELECT * FROM posts WHERE ctid = ANY(:ids)"),
{"ids": ids_},
)
.mappings()
.all()
)
return rows
def sql_fetch_batch(batch_size: int = 16, offset: int = 0):
query = text("""
SELECT ctid, content
FROM posts
ORDER BY ctid
LIMIT :limit
OFFSET :offset
""")
with sql_client.connect() as conn:
rows = conn.execute(query, {"limit": batch_size, "offset": offset}).mappings().all()
return rows
def sql_get_by_date(message_date: str):
with sql_client.connect() as conn:
rows = (
conn.execute(
text(
"""
SELECT *
FROM posts
WHERE message_dt = :message_date
"""
),
{"message_date": message_date},
)
.mappings()
.all()
)
return rows