Spaces:
Running
Running
File size: 3,086 Bytes
f972805 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
import sqlite3
import pandas as pd
class Pandas2SQL:
"""
Classe pour convertir un DataFrame pandas en table SQLite
avec détection automatique des types de colonnes.
"""
def __init__(self, db_path=":memory:"):
"""
Initialise la connexion à la base de données SQLite
Args:
db_path (str): Chemin vers le fichier de base de données SQLite
(par défaut utilise une base de données en mémoire)
"""
self.db_path = db_path
def _get_sqlite_type(self, pandas_dtype):
"""
Convertit un type pandas en type SQLite approprié
Args:
pandas_dtype: Type pandas
Returns:
str: Type SQLite correspondant
"""
if pd.api.types.is_integer_dtype(pandas_dtype):
return "INTEGER"
elif pd.api.types.is_float_dtype(pandas_dtype):
return "REAL"
elif pd.api.types.is_bool_dtype(pandas_dtype):
return "INTEGER" # SQLite n'a pas de type booléen, utilise INTEGER (0/1)
elif pd.api.types.is_datetime64_dtype(pandas_dtype):
return "TIMESTAMP"
else:
return "TEXT" # Pour les types object, string, category, etc.
def create_table(self, df, table_name, if_exists="replace", primary_key=None):
"""
Crée une table SQLite basée sur un DataFrame pandas
Args:
df (pandas.DataFrame): DataFrame à convertir
table_name (str): Nom de la table à créer
if_exists (str): Action si la table existe ('fail', 'replace', 'append')
primary_key (str): Nom de la colonne à définir comme clé primaire (optionnel)
"""
# Création du schéma de table basé sur les types de colonnes
columns = []
for col_name, dtype in df.dtypes.items():
sqlite_type = self._get_sqlite_type(dtype)
col_def = f'"{col_name}" {sqlite_type}'
if primary_key and col_name == primary_key:
col_def += " PRIMARY KEY"
columns.append(col_def)
# Création de la requête SQL
create_query = f'CREATE TABLE "{table_name}" ({", ".join(columns)})'
# Connexion et création de la table
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
if if_exists == "replace":
cursor.execute(f'DROP TABLE IF EXISTS "{table_name}"')
elif if_exists == "fail":
cursor.execute(
f'SELECT name FROM sqlite_master WHERE type="table" AND name="{table_name}"'
)
if cursor.fetchone():
raise ValueError(f"La table '{table_name}' existe déjà.")
cursor.execute(create_query)
# Insertion des données
df.to_sql(table_name, conn, if_exists="append", index=False)
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
|