| from typing import Hashable, Optional, Literal | |
| import pandas as pd | |
| from sqlalchemy import text | |
| from src.config import sql_client | |
| def sql_drop(table: str): | |
| try: | |
| with sql_client.begin() as conn: | |
| conn.execute(text(f"drop table if exists {table};")) | |
| except Exception as e: | |
| print("Ошибка:", e) | |
| def sql_dump_df( | |
| df: pd.DataFrame, | |
| table: str, | |
| if_exists: Literal["replace", "append"] = "append", | |
| ) -> Optional[int]: | |
| return df.to_sql(table, sql_client, if_exists=if_exists, index=False) | |
| def sql_get_table(table: str) -> pd.DataFrame: | |
| with sql_client.connect() as conn: | |
| df = pd.read_sql(f"""select * from {table}""", conn) | |
| return df | |
| def sql_get_by_id(id_: Hashable) -> dict: | |
| with sql_client.connect() as conn: | |
| row = ( | |
| conn.execute( | |
| text("SELECT * FROM posts WHERE ctid = :id"), | |
| {"id": id_}, | |
| ) | |
| .mappings() | |
| .first() | |
| ) | |
| return row | |
| def sql_get_by_ids(ids_: Hashable) -> list[dict]: | |
| with sql_client.connect() as conn: | |
| rows = ( | |
| conn.execute( | |
| text("SELECT * FROM posts WHERE ctid = ANY(:ids)"), | |
| {"ids": ids_}, | |
| ) | |
| .mappings() | |
| .all() | |
| ) | |
| return rows | |
| def sql_fetch_batch(batch_size: int = 16, offset: int = 0): | |
| query = text(""" | |
| SELECT ctid, content | |
| FROM posts | |
| ORDER BY ctid | |
| LIMIT :limit | |
| OFFSET :offset | |
| """) | |
| with sql_client.connect() as conn: | |
| rows = conn.execute(query, {"limit": batch_size, "offset": offset}).mappings().all() | |
| return rows | |
| def sql_get_by_date(message_date: str): | |
| with sql_client.connect() as conn: | |
| rows = ( | |
| conn.execute( | |
| text( | |
| """ | |
| SELECT * | |
| FROM posts | |
| WHERE message_dt = :message_date | |
| """ | |
| ), | |
| {"message_date": message_date}, | |
| ) | |
| .mappings() | |
| .all() | |
| ) | |
| return rows | |