File size: 2,156 Bytes
565e754
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from typing import Hashable, Optional, Literal

import pandas as pd
from sqlalchemy import text

from src.config import sql_client


def sql_drop(table: str):
    try:
        with sql_client.begin() as conn:
            conn.execute(text(f"drop table if exists {table};"))
    except Exception as e:
        print("Ошибка:", e)


def sql_dump_df(
        df: pd.DataFrame, 
        table: str, 
        if_exists: Literal["replace", "append"] = "append",
    ) -> Optional[int]:
    return df.to_sql(table, sql_client, if_exists=if_exists, index=False)


def sql_get_table(table: str) -> pd.DataFrame:
    with sql_client.connect() as conn:
        df = pd.read_sql(f"""select * from {table}""", conn)

    return df


def sql_get_by_id(id_: Hashable) -> dict:
    with sql_client.connect() as conn:
        row = (
            conn.execute(
                text("SELECT * FROM posts WHERE ctid = :id"),
                {"id": id_},
            )
            .mappings()
            .first()
        )

    return row


def sql_get_by_ids(ids_: Hashable) -> list[dict]:
    with sql_client.connect() as conn:
        rows = (
            conn.execute(
                text("SELECT * FROM posts WHERE ctid = ANY(:ids)"),
                {"ids": ids_},
            )
            .mappings()
            .all()
        )

    return rows


def sql_fetch_batch(batch_size: int = 16, offset: int = 0):
    query = text("""
        SELECT ctid, content
        FROM posts
        ORDER BY ctid
        LIMIT :limit
        OFFSET :offset
    """)

    with sql_client.connect() as conn:
        rows = conn.execute(query, {"limit": batch_size, "offset": offset}).mappings().all()

    return rows


def sql_get_by_date(message_date: str):
    with sql_client.connect() as conn:
        rows = (
            conn.execute(
                text(
                    """
                    SELECT *
                    FROM posts
                    WHERE message_dt = :message_date
                    """
                ),
                {"message_date": message_date},
            )
            .mappings()
            .all()
        )

    return rows