File size: 3,086 Bytes
f972805
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import sqlite3

import pandas as pd


class Pandas2SQL:
    """
    Classe pour convertir un DataFrame pandas en table SQLite
    avec détection automatique des types de colonnes.
    """

    def __init__(self, db_path=":memory:"):
        """
        Initialise la connexion à la base de données SQLite

        Args:
            db_path (str): Chemin vers le fichier de base de données SQLite
                           (par défaut utilise une base de données en mémoire)
        """
        self.db_path = db_path

    def _get_sqlite_type(self, pandas_dtype):
        """
        Convertit un type pandas en type SQLite approprié

        Args:
            pandas_dtype: Type pandas

        Returns:
            str: Type SQLite correspondant
        """
        if pd.api.types.is_integer_dtype(pandas_dtype):
            return "INTEGER"
        elif pd.api.types.is_float_dtype(pandas_dtype):
            return "REAL"
        elif pd.api.types.is_bool_dtype(pandas_dtype):
            return "INTEGER"  # SQLite n'a pas de type booléen, utilise INTEGER (0/1)
        elif pd.api.types.is_datetime64_dtype(pandas_dtype):
            return "TIMESTAMP"
        else:
            return "TEXT"  # Pour les types object, string, category, etc.

    def create_table(self, df, table_name, if_exists="replace", primary_key=None):
        """
        Crée une table SQLite basée sur un DataFrame pandas

        Args:
            df (pandas.DataFrame): DataFrame à convertir
            table_name (str): Nom de la table à créer
            if_exists (str): Action si la table existe ('fail', 'replace', 'append')
            primary_key (str): Nom de la colonne à définir comme clé primaire (optionnel)
        """
        # Création du schéma de table basé sur les types de colonnes
        columns = []
        for col_name, dtype in df.dtypes.items():
            sqlite_type = self._get_sqlite_type(dtype)
            col_def = f'"{col_name}" {sqlite_type}'
            if primary_key and col_name == primary_key:
                col_def += " PRIMARY KEY"
            columns.append(col_def)

        # Création de la requête SQL
        create_query = f'CREATE TABLE "{table_name}" ({", ".join(columns)})'

        # Connexion et création de la table
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        try:
            if if_exists == "replace":
                cursor.execute(f'DROP TABLE IF EXISTS "{table_name}"')
            elif if_exists == "fail":
                cursor.execute(
                    f'SELECT name FROM sqlite_master WHERE type="table" AND name="{table_name}"'
                )
                if cursor.fetchone():
                    raise ValueError(f"La table '{table_name}' existe déjà.")

            cursor.execute(create_query)

            # Insertion des données
            df.to_sql(table_name, conn, if_exists="append", index=False)
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()