Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import psycopg2 | |
| import sqlite3 | |
| from psycopg2 import extras | |
| from datetime import datetime | |
| import logging | |
| import json | |
| import pandas as pd | |
| from typing import List, Dict, Tuple | |
| import os | |
| import sys | |
| import unicodedata | |
| # Configuration du logging | |
| logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler()]) | |
| logger = logging.getLogger(__name__) | |
| sys.stdout.reconfigure(encoding="utf-8") | |
| # Configuration de la base de données | |
| db_config = { | |
| "database": st.secrets["DB_NAME"], | |
| # "user": st.secrets["DB_USER"], | |
| # "password": st.secrets["DB_PASSWORD"], | |
| # "host": st.secrets["DB_HOST"], | |
| # "port": st.secrets["DB_PORT"] | |
| } | |
| ######################### CLASSES ######################### | |
| class DBManager: | |
| def __init__(self, db_config: Dict, schema_file: str) -> None: | |
| """ | |
| Initialise la connexion à la base PostgreSQL et charge le schéma. | |
| Args: | |
| db_config (Dict) : dictionnaire avec les informations de connexion (host, database, user, password). | |
| schema_file (str) : chemin vers le fichier JSON contenant le schéma de la base. | |
| """ | |
| self.db_config = db_config | |
| self.schema_file = schema_file | |
| self.connection = None | |
| self.cursor = None | |
| self._load_schema() | |
| self._connect_to_database() | |
| self._create_database() | |
| # self.cursor.execute("SET NAMES 'UTF8'") | |
| def _load_schema(self) -> None: | |
| """ | |
| Charge le schéma de base de données depuis un fichier JSON. | |
| """ | |
| if not os.path.exists(self.schema_file): | |
| raise FileNotFoundError(f"Fichier non trouvé : {self.schema_file}") | |
| with open(self.schema_file, "r", encoding="utf-8") as file: | |
| self.schema = json.load(file) | |
| # def _connect_to_database(self): | |
| # """Établit une connexion avec la base PostgreSQL.""" | |
| # try: | |
| # self.connection = psycopg2.connect(**self.db_config, cursor_factory=extras.DictCursor) | |
| # self.cursor = self.connection.cursor() | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| def _connect_to_database(self) -> None: | |
| """ | |
| Établit une connexion avec la base SQLite. | |
| """ | |
| try: | |
| # Connexion à la base de données SQLite | |
| self.connection = sqlite3.connect( | |
| self.db_config["database"], check_same_thread=False | |
| ) | |
| self.connection.row_factory = ( | |
| sqlite3.Row | |
| ) # Pour des résultats sous forme de dictionnaire | |
| self.cursor = self.connection.cursor() | |
| except sqlite3.Error as err: | |
| logger.error(f"Erreur de connexion : {err}") | |
| return | |
| # def _create_database(self): | |
| # """Crée les tables définies dans le schéma JSON.""" | |
| # for table_name, table_info in self.schema['tables'].items(): | |
| # create_table_query = self._generate_create_table_query(table_name, table_info['columns']) | |
| # try: | |
| # self.cursor.execute(create_table_query) | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| # finally: | |
| # self.connection.commit() | |
| import sqlite3 | |
| def _create_database(self) -> None: | |
| """ | |
| Crée les tables définies dans le schéma JSON. | |
| """ | |
| for table_name, table_info in self.schema["tables"].items(): | |
| create_table_query = self._generate_create_table_query( | |
| table_name, table_info["columns"] | |
| ) | |
| try: | |
| self.cursor.execute(create_table_query) | |
| except sqlite3.Error as err: | |
| logger.error( | |
| f"Erreur lors de la création de la table {table_name}: {err}" | |
| ) | |
| return | |
| finally: | |
| self.connection.commit() | |
| # def _generate_create_table_query(self, table_name: str, columns: List[Dict]) -> str: | |
| # """Génère une requête SQL pour créer une table en fonction du schéma.""" | |
| # column_definitions = [] | |
| # for column in columns: | |
| # column_definition = f"{column['name']} {column['type']}" | |
| # if 'constraints' in column and column['constraints']: | |
| # column_definition += " " + " ".join(column['constraints']) | |
| # column_definitions.append(column_definition) | |
| # columns_str = ", ".join(column_definitions) | |
| # return f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str});" | |
| def _generate_create_table_query(self, table_name: str, columns: List[Dict]) -> str: | |
| """ | |
| Génère une requête SQL pour créer une table en fonction du schéma. | |
| Args: | |
| table_name (str): nom de la table. | |
| columns (List[Dict]): colonnes de la table à créer. | |
| Returns: | |
| str : la requête SQL CREATE TABLE paramétrée. | |
| """ | |
| column_definitions = [] | |
| for column in columns: | |
| column_definition = f"{column['name']} {column['type']}" | |
| # Conversion quand le type n'est pas compatible avec SQLite (ex. : SERIAL -> INTEGER PRIMARY KEY AUTOINCREMENT) | |
| if column["type"] == "SERIAL": | |
| column_definition = ( | |
| f"{column['name']} INTEGER PRIMARY KEY AUTOINCREMENT" | |
| ) | |
| if "constraints" in column and column["constraints"]: | |
| column_definition += " " + " ".join(column["constraints"]) | |
| column_definitions.append(column_definition) | |
| columns_str = ", ".join(column_definitions) | |
| return f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str});" | |
| # def insert_data_from_dict(self, table_name: str, data: List[Dict], id_column: str) -> List[int]: | |
| # """Insère des données dans une table à partir d'une liste de dictionnaires et retourne les IDs insérés. | |
| # table_name : str : nom de la table | |
| # data : List[Dict] : données à insérer | |
| # id_column : str : nom de la colonne d'ID à retourner | |
| # """ | |
| # columns = ", ".join(data[0].keys()) | |
| # placeholders = ", ".join(['%s' for _ in data[0].keys()]) | |
| # # Requête pour insérer les données et retourner l'ID dynamique | |
| # query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders}) RETURNING {id_column}" | |
| # ids = [] | |
| # try: | |
| # for row in data: | |
| # self.cursor.execute(query, tuple(row.values())) | |
| # inserted_id = self.cursor.fetchone()[0] | |
| # ids.append(inserted_id) | |
| # return ids | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| # finally: | |
| # self.connection.commit() | |
| def insert_data_from_dict(self, table_name: str, data: List[Dict]) -> List[int]: | |
| """ | |
| Insère des données dans une table à partir d'une liste de dictionnaires et retourne les IDs insérés. | |
| Args: | |
| table_name (str): nom de la table. | |
| data (List[Dict]): données à insérer. | |
| Returns: | |
| List[int]: liste des ID des données insérées. | |
| """ | |
| columns = ", ".join(data[0].keys()) | |
| placeholders = ", ".join( | |
| ["?" for _ in data[0].keys()] | |
| ) # SQLite utilise '?' pour les placeholders | |
| # Requête pour insérer les données | |
| query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" | |
| ids = [] | |
| try: | |
| for row in data: | |
| self.cursor.execute(query, tuple(row.values())) | |
| inserted_id = ( | |
| self.cursor.lastrowid | |
| ) # Récupère l'ID du dernier enregistrement inséré | |
| ids.append(inserted_id) | |
| return ids | |
| except sqlite3.Error as err: | |
| logger.error( | |
| f"Erreur lors de l'insertion des données dans {table_name}: {err}" | |
| ) | |
| return | |
| finally: | |
| self.connection.commit() | |
| # def insert_data_from_csv(self, table_name: str, csv_file: str) -> None: | |
| # """Insère des données dans une table à partir d'un fichier CSV.""" | |
| # df = pd.read_csv(csv_file) | |
| # columns = df.columns.tolist() | |
| # placeholders = ", ".join(['%s' for _ in columns]) | |
| # query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})" | |
| # try: | |
| # for row in df.itertuples(index=False, name=None): | |
| # self.cursor.execute(query, row) | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| # finally: | |
| # self.connection.commit() | |
| def insert_data_from_csv(self, table_name: str, csv_file: str) -> None: | |
| """ | |
| Insère des données dans une table à partir d'un fichier CSV. | |
| Args: | |
| table_name (str): nom de la table dans laquelle insérer des données. | |
| csv_file (str): lien du fichier csv contenant les données. | |
| """ | |
| df = pd.read_csv(csv_file) | |
| columns = df.columns.tolist() | |
| placeholders = ", ".join( | |
| ["?" for _ in columns] | |
| ) # Utilisation de '?' pour SQLite | |
| query = ( | |
| f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})" | |
| ) | |
| try: | |
| for row in df.itertuples(index=False, name=None): | |
| self.cursor.execute( | |
| query, row | |
| ) # Exécution de la requête avec les valeurs du CSV | |
| except sqlite3.Error as err: | |
| logger.error( | |
| f"Erreur lors de l'insertion des données depuis {csv_file} : {err}" | |
| ) | |
| finally: | |
| self.connection.commit() | |
| # def fetch_all(self, table_name: str) -> List[Tuple]: | |
| # """ | |
| # Récupère toutes les données d'une table. | |
| # Args: | |
| # table_name (str): nom de la table de laquelle récupérer des données. | |
| # Returns: | |
| # List[Tuple]: liste des enregistrements récupérés à partir de la table. | |
| # """ | |
| # try: | |
| # self.cursor.execute(f"SELECT * FROM {table_name}") | |
| # return self.cursor.fetchall() | |
| # except sqlite3.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| def fetch_all(self, table_name: str) -> List[Tuple]: | |
| """ | |
| Récupère toutes les données d'une table. | |
| Args: | |
| table_name (str): nom de la table de laquelle récupérer des données. | |
| Returns: | |
| List[Tuple]: liste des enregistrements récupérés à partir de la table. | |
| """ | |
| try: | |
| self.cursor.execute(f"SELECT * FROM {table_name}") | |
| return self.cursor.fetchall() | |
| except sqlite3.Error as err: | |
| logger.error( | |
| f"Erreur lors de la récupération des données de la table {table_name} : {err}" | |
| ) | |
| return [] # Retourne une liste vide en cas d'erreur | |
| # def execute_safe(self, query: str, params: Tuple = (), fetch: bool = False): | |
| # """ | |
| # Exécute une requête SQL avec gestion centralisée des erreurs. | |
| # :param query: Requête SQL à exécuter. | |
| # :param params: Paramètres de la requête. | |
| # :param fetch: Indique si les résultats doivent être récupérés. | |
| # :return: Résultats de la requête (si fetch est True), sinon None. | |
| # """ | |
| # try: | |
| # self.cursor.execute(query, params) | |
| # if fetch: | |
| # return self.cursor.fetchall() # Retourner les résultats si demandé | |
| # else: | |
| # self.connection.commit() # Valider les modifications | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # self.connection.rollback() # Annuler la transaction en cas d'erreur | |
| # raise RuntimeError(f"Erreur SQL : {err} | Query : {query} | Params : {params}") | |
| def execute_safe( | |
| self, query: str, params: Tuple = (), fetch: bool = False | |
| ) -> List[Tuple]: | |
| """ | |
| Exécute une requête SQL avec gestion centralisée des erreurs. | |
| Args: | |
| query (str): requête SQL à exécuter. | |
| params (Tuple): paramètres de la requête. | |
| fetch (bool): indique si les résultats doivent être récupérés. | |
| Returns: | |
| List[Tuple]: résultats de la requête (si fetch est True), sinon None. | |
| """ | |
| try: | |
| self.cursor.execute(query, params) | |
| if fetch: | |
| return self.cursor.fetchall() # Retourner les résultats si demandé | |
| else: | |
| self.connection.commit() # Valider les modifications | |
| except sqlite3.Error as err: | |
| logger.error(f"Erreur lors de l'exécution de la requête : {err}") | |
| self.connection.rollback() # Annuler la transaction en cas d'erreur | |
| raise RuntimeError( | |
| f"Erreur SQL : {err} | Query : {query} | Params : {params}" | |
| ) | |
| # def fetch_by_condition(self, table_name: str, condition: str, params: Tuple = ()) -> List[Tuple]: | |
| # """Récupère les données d'une table avec une condition.""" | |
| # query = f"SELECT * FROM {table_name} WHERE {condition}" | |
| # try: | |
| # self.cursor.execute(query, params) | |
| # return self.execute_safe(query, params, fetch=True) | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| def fetch_by_condition( | |
| self, table_name: str, condition: str, params: Tuple = () | |
| ) -> List[Tuple]: | |
| """ | |
| Récupère les données d'une table avec une condition. | |
| Args: | |
| table_name (str): nom de la table de laquelle récupérer des données. | |
| condition (str): condition qui sera inclue dans la clause WHERE pour filtrage. | |
| params (Tuple): paramètres de la requête. | |
| Returns: | |
| List[Tuple]: résultats de la requête (si fetch est True), sinon None (via la fonction execute_safe). | |
| """ | |
| query = f"SELECT * FROM {table_name} WHERE {condition}" | |
| try: | |
| # Utilise execute_safe pour exécuter la requête et récupérer les résultats | |
| return self.execute_safe(query, params, fetch=True) | |
| except sqlite3.Error as err: | |
| logger.error( | |
| f"Erreur lors de la récupération des données de {table_name} avec condition '{condition}': {err}" | |
| ) | |
| return [] | |
| # def update_data(self, table_name: str, set_clause: str, condition: str, params: Tuple) -> None: | |
| # """Met à jour des données dans une table.""" | |
| # query = f"UPDATE {table_name} SET {set_clause} WHERE {condition}" | |
| # try: | |
| # self.cursor.execute(query, params) | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| # finally: | |
| # self.connection.commit() | |
| def update_data( | |
| self, table_name: str, set_clause: str, condition: str, params: Tuple | |
| ) -> None: | |
| """ | |
| Met à jour des données dans une table. | |
| Args: | |
| table_name (str): nom de la table dont les données vont être mises à jour. | |
| set_clause (str): information qui sera inclue dans la clause SET pour la mise à jour. | |
| condition (str): condition qui sera inclue dans la clause WHERE pour filtrage. | |
| params (Tuple): paramètres de la requête. | |
| """ | |
| query = f"UPDATE {table_name} SET {set_clause} WHERE {condition}" | |
| try: | |
| self.cursor.execute(query, params) | |
| except sqlite3.Error as err: | |
| logger.error( | |
| f"Erreur lors de la mise à jour des données dans {table_name} : {err}" | |
| ) | |
| finally: | |
| self.connection.commit() # Valider les modifications | |
| # def delete_data(self, table_name: str, condition: str, params: Tuple) -> None: | |
| # """Supprime des données d'une table selon une condition.""" | |
| # query = f"DELETE FROM {table_name} WHERE {condition}" | |
| # try: | |
| # self.cursor.execute(query, params) | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| # finally: | |
| # self.connection.commit() | |
| def delete_data(self, table_name: str, condition: str, params: Tuple) -> None: | |
| """ | |
| Supprime des données d'une table selon une condition. | |
| Args: | |
| table_name (str): nom de la table dont les données vont être suprimées. | |
| condition (str): condition qui sera inclue dans la clause WHERE pour filtrage. | |
| params (Tuple): paramètres de la requête. | |
| """ | |
| query = f"DELETE FROM {table_name} WHERE {condition}" | |
| try: | |
| self.cursor.execute(query, params) | |
| except sqlite3.Error as err: | |
| logger.error( | |
| f"Erreur lors de la suppression des données dans {table_name} : {err}" | |
| ) | |
| finally: | |
| self.connection.commit() # Valider les modifications | |
| # def close_connection(self) -> None: | |
| # """Ferme la connexion à la base de données.""" | |
| # if self.connection: | |
| # self.cursor.close() | |
| # self.connection.close() | |
| def close_connection(self) -> None: | |
| """ | |
| Ferme la connexion à la base de données. | |
| """ | |
| if self.connection: | |
| try: | |
| self.cursor.close() # Fermer le curseur | |
| self.connection.close() # Fermer la connexion | |
| except sqlite3.Error as err: | |
| logger.error(f"Erreur lors de la fermeture de la connexion : {err}") | |
| # def create_index(self, table_name: str, column_name: str) -> None: | |
| # """Crée un index sur une colonne spécifique pour améliorer les performances de recherche.""" | |
| # query = f"CREATE INDEX IF NOT EXISTS idx_{table_name}_{column_name} ON {table_name} ({column_name})" | |
| # try: | |
| # self.cursor.execute(query) | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| # finally: | |
| # self.connection.commit() | |
| def create_index(self, table_name: str, column_name: str) -> None: | |
| """ | |
| Crée un index sur une colonne spécifique pour améliorer les performances de recherche. | |
| """ | |
| query = f"CREATE INDEX IF NOT EXISTS idx_{table_name}_{column_name} ON {table_name} ({column_name})" | |
| try: | |
| self.cursor.execute(query) | |
| except sqlite3.Error as err: | |
| logger.error( | |
| f"Erreur lors de la création de l'index sur {table_name} ({column_name}) : {err}" | |
| ) | |
| finally: | |
| self.connection.commit() # Valider les modifications | |
| # def select(self, query: str, params: Tuple = ()) -> List[Tuple]: | |
| # """Exécute une requête SELECT personnalisée et retourne les résultats.""" | |
| # try: | |
| # self.cursor.execute(query, params) | |
| # return self.cursor.fetchall() | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| def select(self, query: str, params: Tuple = ()) -> List[Tuple]: | |
| """ | |
| Exécute une requête SELECT personnalisée et retourne les résultats. | |
| Args: | |
| query (str): requête SELECT. | |
| params (Tuple): paramètres de la requête. | |
| Returns: | |
| List[Tuple]: liste des enregistrements récupérés. | |
| """ | |
| try: | |
| self.cursor.execute(query, params) | |
| return self.cursor.fetchall() | |
| except sqlite3.Error as err: | |
| logger.error(f"Erreur lors de l'exécution de la requête SELECT : {err}") | |
| return [] | |
| # def query(self, query, params=None): | |
| # """ | |
| # Exécute une requête SQL, en utilisant les paramètres fournis, | |
| # et retourne les résultats si nécessaire. | |
| # """ | |
| # try: | |
| # self.cursor.execute(query, params) | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| # finally: | |
| # # Si la requête est un SELECT, récupérer les résultats | |
| # if query.strip().upper().startswith("SELECT"): | |
| # return self.cursor.fetchall() | |
| # else: # Si ce n'est pas un SELECT, ne rien retourner (utile pour INSERT/UPDATE) | |
| # self.connection.commit() | |
| # return None | |
| def query(self, query: str, params: Tuple = None) -> List[Tuple]: | |
| """ | |
| Exécute une requête SQL, en utilisant les paramètres fournis, et retourne les résultats si nécessaire. | |
| Args: | |
| query (str): reqête SQL. | |
| params (Tuple): paramètres de la requête. | |
| Returns: | |
| List[Tuple]: list des enregistrements récupérés s'il s'agit d'une requête SELECT, None sinon. | |
| """ | |
| try: | |
| if params is None: | |
| self.cursor.execute(query) | |
| else: | |
| self.cursor.execute(query, params) | |
| except sqlite3.Error as err: | |
| logger.error(f"Erreur lors de l'exécution de la requête : {err}") | |
| return | |
| finally: | |
| # Si la requête est un SELECT, récupérer les résultats | |
| if query.strip().upper().startswith("SELECT"): | |
| return self.cursor.fetchall() | |
| else: # Si ce n'est pas un SELECT, ne rien retourner (utile pour INSERT/UPDATE) | |
| self.connection.commit() | |
| return None | |
| ######################### FONCTIONS ######################### | |
| # Mettre DBManager en cache | |
| def get_db_manager(): | |
| return DBManager(db_config, os.path.join("server", "db", "schema.json")) | |
| # def save_message(db_manager, id_conversation: int, role: str, content: str,temps_traitement, total_cout, impact_eco) -> None: | |
| # """ | |
| # Sauvegarde un message dans la base de données, en associant l'utilisateur à la conversation. | |
| # :param db_manager: Instance de DBManager. | |
| # :param id_conversation: ID de la conversation associée. | |
| # :param role: Rôle de l'intervenant (ex. 'user' ou 'assistant'). | |
| # :param content: Contenu du message. | |
| # """ | |
| # timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # data = [{ | |
| # "id_conversation": id_conversation, | |
| # "role": role, | |
| # "content": content, | |
| # "timestamp": timestamp, | |
| # "temps_traitement":temps_traitement, | |
| # "total_cout": total_cout, | |
| # "impact_eco": impact_eco | |
| # }] | |
| # try: | |
| # db_manager.insert_data_from_dict("messages", data, id_column="id_message") | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion: {err}") | |
| # return | |
| def save_message( | |
| db_manager, | |
| id_conversation: int, | |
| role: str, | |
| content: str, | |
| temps_traitement: float, | |
| total_cout: float, | |
| impact_eco: float, | |
| ) -> None: | |
| """ | |
| Sauvegarde un message dans la base de données, en associant l'utilisateur à la conversation. | |
| Args: | |
| db_manager: instance de DBManager. | |
| id_conversation (int): ID de la conversation associée. | |
| role (str): rôle de l'intervenant (ex. 'user' ou 'assistant'). | |
| content (str): contenu du message. | |
| temps_traitement (float): temps de traitement. | |
| total_cout (float): coût total associé au message. | |
| impact_eco (float): impact économique du message. | |
| """ | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| data = [ | |
| { | |
| "id_conversation": id_conversation, | |
| "role": role, | |
| "content": content, | |
| "timestamp": timestamp, | |
| "temps_traitement": temps_traitement, | |
| "total_cout": total_cout, | |
| "impact_eco": impact_eco, | |
| } | |
| ] | |
| try: | |
| # Insertion des données dans la table "messages" en utilisant la méthode d'insertion adaptée | |
| db_manager.insert_data_from_dict("messages", data) | |
| except sqlite3.Error as err: # Utilisation de sqlite3.Error pour gérer les exceptions SQLite | |
| logger.error(f"Erreur lors de la sauvegarde du message : {err}") | |
| return | |
| # def create_conversation(db_manager, title: str, id_utilisateur: int) -> int: | |
| # """ | |
| # Crée une nouvelle conversation dans la base de données, en associant l'utilisateur à la conversation. | |
| # :param db_manager: Instance de DBManager. | |
| # :param title: Titre de la conversation. | |
| # :param id_utilisateur: ID de l'utilisateur associé. | |
| # :return: ID de la conversation nouvellement créée. | |
| # """ | |
| # created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # data = [{ | |
| # "created_at": created_at, | |
| # "title": title, | |
| # "id_utilisateur": id_utilisateur, | |
| # }] | |
| # try: | |
| # result = db_manager.insert_data_from_dict("conversations", data, id_column="id_conversation") | |
| # return result[0] | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| def create_conversation(db_manager, title: str, id_utilisateur: int) -> int: | |
| """ | |
| Crée une nouvelle conversation dans la base de données, en associant l'utilisateur à la conversation. | |
| Args: | |
| db_manager: instance de DBManager. | |
| title (str): titre de la conversation. | |
| id_utilisateur (int): ID de l'utilisateur associé. | |
| Returns: | |
| int: ID de la conversation nouvellement créée. | |
| """ | |
| created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| data = [ | |
| {"created_at": created_at, "title": title, "id_utilisateur": id_utilisateur,} | |
| ] | |
| try: | |
| result = db_manager.insert_data_from_dict("conversations", data) | |
| return result[0] # Retourne l'ID de la conversation nouvellement créée | |
| except sqlite3.Error as err: # Gestion des erreurs avec sqlite3 | |
| logger.error(f"Erreur lors de la création de la conversation : {err}") | |
| return None | |
| # def load_messages(db_manager, id_conversation: int) -> List[Dict]: | |
| # """ | |
| # Charge les messages associés à une conversation. | |
| # :param db_manager: Instance de DBManager. | |
| # :param id_conversation: ID de la conversation. | |
| # :return: Liste des messages sous forme de dictionnaires. | |
| # """ | |
| # query = """ | |
| # SELECT * | |
| # FROM messages | |
| # WHERE id_conversation = %s | |
| # ORDER BY timestamp ASC | |
| # """ | |
| # try: | |
| # result = db_manager.query(query, (id_conversation,)) | |
| # return [{"role": row["role"], "content": row["content"], "timestamp":row["timestamp"], "temps_traitement":row["temps_traitement"]} for row in result] | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| def load_messages(db_manager, id_conversation: int) -> List[Dict]: | |
| """ | |
| Charge les messages associés à une conversation. | |
| Args: | |
| db_manager: instance de DBManager. | |
| id_conversation (int): ID de la conversation. | |
| Returns: | |
| List[Dict]: liste des messages sous forme de dictionnaires. | |
| """ | |
| query = """ | |
| SELECT role, content, timestamp, temps_traitement | |
| FROM messages | |
| WHERE id_conversation = ? | |
| ORDER BY timestamp ASC | |
| """ | |
| try: | |
| result = db_manager.query(query, (id_conversation,)) | |
| # Résultats sous forme de dictionnaires, si la fonction query retourne des tuples ou dictionnaires | |
| return [ | |
| { | |
| "role": row["role"], | |
| "content": row["content"], | |
| "timestamp": row["timestamp"], | |
| "temps_traitement": row["temps_traitement"], | |
| } | |
| for row in result | |
| ] | |
| except sqlite3.Error as err: # Utilisation de sqlite3.Error pour capturer les erreurs SQLite | |
| logger.error(f"Erreur lors du chargement des messages : {err}") | |
| return [] | |
| # def load_conversations(db_manager, id_utilisateur: int) -> List[Dict]: | |
| # """ | |
| # Charge toutes les conversations enregistrées pour un utilisateur donné. | |
| # :param db_manager: Instance de DBManager. | |
| # :param id_utilisateur: ID de l'utilisateur. | |
| # :return: Liste des conversations. | |
| # """ | |
| # query = """ | |
| # SELECT * FROM conversations | |
| # WHERE id_utilisateur = %s | |
| # ORDER BY created_at DESC | |
| # """ | |
| # try: | |
| # result = db_manager.query(query, (id_utilisateur,)) | |
| # return [ | |
| # {"id_conversation": row["id_conversation"], "created_at": row["created_at"], "title": row["title"]} for row in result | |
| # ] | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| def load_conversations(db_manager, id_utilisateur: int) -> List[Dict]: | |
| """ | |
| Charge toutes les conversations enregistrées pour un utilisateur donné. | |
| Args: | |
| db_manager: instance de DBManager. | |
| id_utilisateur (int): ID de l'utilisateur. | |
| Returns: | |
| List[Dict]: liste des conversations. | |
| """ | |
| query = """ | |
| SELECT id_conversation, created_at, title | |
| FROM conversations | |
| WHERE id_utilisateur = ? | |
| ORDER BY created_at DESC | |
| """ | |
| try: | |
| result = db_manager.query(query, (id_utilisateur,)) | |
| return [ | |
| { | |
| "id_conversation": row["id_conversation"], | |
| "created_at": row["created_at"], | |
| "title": row["title"], | |
| } | |
| for row in result | |
| ] | |
| except sqlite3.Error as err: # Gestion des erreurs avec sqlite3 | |
| logger.error(f"Erreur lors du chargement des conversations : {err}") | |
| return [] | |
| # def update_conversation(db_manager, id_conversation: int, id_utilisateur: int) -> None: | |
| # """ | |
| # Met à jour le champ `created_at` d'une conversation donnée pour un utilisateur. | |
| # :param db_manager: Instance de DBManager. | |
| # :param id_conversation: ID de la conversation à mettre à jour. | |
| # :param id_utilisateur: ID de l'utilisateur. | |
| # """ | |
| # new_timer = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # query = """ | |
| # UPDATE conversations | |
| # SET created_at = %s | |
| # WHERE id_conversation = %s AND id_utilisateur = %s | |
| # """ | |
| # try: | |
| # db_manager.query(query, (new_timer, id_conversation, id_utilisateur)) | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| def update_conversation(db_manager, id_conversation: int, id_utilisateur: int) -> None: | |
| """ | |
| Met à jour le champ `created_at` d'une conversation donnée pour un utilisateur. | |
| Args: | |
| db_manager: instance de DBManager. | |
| id_conversation (int): ID de la conversation à mettre à jour. | |
| id_utilisateur (int): ID de l'utilisateur. | |
| """ | |
| new_timer = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| query = """ | |
| UPDATE conversations | |
| SET created_at = ? | |
| WHERE id_conversation = ? AND id_utilisateur = ? | |
| """ | |
| try: | |
| db_manager.query(query, (new_timer, id_conversation, id_utilisateur)) | |
| except sqlite3.Error as err: # Utilisation de sqlite3.Error pour capturer les erreurs SQLite | |
| logger.error(f"Erreur lors de la mise à jour de la conversation : {err}") | |
| return | |
| # def update_conversation_title(db_manager, id_conversation: int, new_title: str) -> None: | |
| # """ | |
| # Met à jour le titre d'une conversation si celui-ci est encore "Nouvelle conversation". | |
| # :param db_manager: Instance de DBManager. | |
| # :param id_conversation: ID de la conversation à mettre à jour. | |
| # :param new_title: Nouveau titre de la conversation. | |
| # """ | |
| # query = """ | |
| # UPDATE conversations | |
| # SET title = %s | |
| # WHERE id_conversation = %s AND title = 'Nouvelle conversation' | |
| # """ | |
| # try: | |
| # db_manager.query(query, (new_title, id_conversation)) | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| def update_conversation_title(db_manager, id_conversation: int, new_title: str) -> None: | |
| """ | |
| Met à jour le titre d'une conversation si celui-ci est encore "Nouvelle conversation". | |
| Args: | |
| db_manager: instance de DBManager. | |
| id_conversation (int): ID de la conversation à mettre à jour. | |
| new_title (str): nouveau titre de la conversation. | |
| """ | |
| query = """ | |
| UPDATE conversations | |
| SET title = ? | |
| WHERE id_conversation = ? AND title = 'Nouvelle conversation' | |
| """ | |
| try: | |
| db_manager.query(query, (new_title, id_conversation)) | |
| except sqlite3.Error as err: # Utilisation de sqlite3.Error pour gérer les erreurs SQLite | |
| logger.error( | |
| f"Erreur lors de la mise à jour du titre de la conversation : {err}" | |
| ) | |
| return | |
| # def get_conversation_title(db_manager, id_conversation: int) -> str: | |
| # """ | |
| # Récupère le titre d'une conversation spécifique en utilisant `fetch_by_condition`. | |
| # :param db_manager: Instance de DBManager. | |
| # :param id_conversation: ID de la conversation à interroger. | |
| # :return: Le titre de la conversation ou "Nouvelle conversation". | |
| # """ | |
| # table_name = "conversations" | |
| # condition = "id_conversation = %s" | |
| # try: | |
| # results = db_manager.fetch_by_condition(table_name, condition, (id_conversation,)) | |
| # if results: | |
| # # Suppose que `title` est la troisième colonne | |
| # return results[0][2] | |
| # return "Nouvelle conversation" | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur de connexion : {err}") | |
| # return | |
| def get_conversation_title(db_manager, id_conversation: int) -> str: | |
| """ | |
| Récupère le titre d'une conversation spécifique en utilisant `fetch_by_condition`. | |
| Args: | |
| db_manager: instance de DBManager. | |
| id_conversation (int): ID de la conversation à interroger. | |
| Returns: | |
| str: le titre de la conversation ou "Nouvelle conversation". | |
| """ | |
| table_name = "conversations" | |
| condition = "id_conversation = ?" | |
| try: | |
| results = db_manager.fetch_by_condition( | |
| table_name, condition, (id_conversation,) | |
| ) | |
| if results: | |
| # Assume that `title` is the second column | |
| return results[0][ | |
| 1 | |
| ] # 1 corresponds to the index of the title column in the result | |
| return "Nouvelle conversation" | |
| except sqlite3.Error as err: # Utilisation de sqlite3.Error pour gérer les erreurs SQLite | |
| logger.error( | |
| f"Erreur lors de la récupération du titre de la conversation : {err}" | |
| ) | |
| return "Nouvelle conversation" | |
| # def delete_conversation(db_manager, id_conversation: int) -> None: | |
| # """ | |
| # Supprime une conversation et ses messages associés de la base de données. | |
| # :param db_manager: Instance de DBManager. | |
| # :param id_conversation: ID de la conversation à supprimer. | |
| # """ | |
| # try: | |
| # # Supprimer les messages liés à la conversation | |
| # query_delete_messages = "DELETE FROM messages WHERE id_conversation = %s" | |
| # db_manager.query(query_delete_messages, (id_conversation,)) | |
| # # Supprimer la conversation elle-même | |
| # query_delete_conversation = "DELETE FROM conversations WHERE id_conversation = %s" | |
| # db_manager.query(query_delete_conversation, (id_conversation,)) | |
| # print(f"✅ Conversation {id_conversation} supprimée avec succès.") | |
| # except Exception as e: | |
| # print(f"❌ Erreur lors de la suppression de la conversation {id_conversation}: {e}") | |
| def delete_conversation(db_manager, id_conversation: int) -> None: | |
| """ | |
| Supprime une conversation et ses messages associés de la base de données. | |
| Args: | |
| db_manager: instance de DBManager. | |
| id_conversation (int): ID de la conversation à supprimer. | |
| """ | |
| try: | |
| # Supprimer les messages liés à la conversation | |
| query_delete_messages = "DELETE FROM messages WHERE id_conversation = ?" | |
| db_manager.query(query_delete_messages, (id_conversation,)) | |
| # Supprimer la conversation elle-même | |
| query_delete_conversation = ( | |
| "DELETE FROM conversations WHERE id_conversation = ?" | |
| ) | |
| db_manager.query(query_delete_conversation, (id_conversation,)) | |
| logger.info(f"✅ Conversation {id_conversation} supprimée avec succès.") | |
| # print(f"✅ Conversation {id_conversation} supprimée avec succès.") | |
| except sqlite3.Error as e: # Utilisation de sqlite3.Error pour capturer les erreurs SQLite | |
| logger.error( | |
| f"❌ Erreur lors de la suppression de la conversation {id_conversation}: {e}" | |
| ) | |
| # print(f"❌ Erreur lors de la suppression de la conversation {id_conversation}: {e}") | |
| # def load_chatbot_suggestions(db_manager, user_id): | |
| # """ | |
| # Charge les suggestions du chatbot enregistrées pour un utilisateur donné. | |
| # """ | |
| # query = "SELECT repas_suggestion FROM suggestions_repas WHERE id_utilisateur = %s" | |
| # try: | |
| # db_manager.cursor.execute(query, (user_id,)) | |
| # suggestions = [row[0] for row in db_manager.cursor.fetchall()] | |
| # return suggestions | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur lors du chargement des suggestions : {err}") | |
| # return [] | |
| def load_chatbot_suggestions(db_manager, user_id: str) -> List[Tuple]: | |
| """ | |
| Charge les suggestions du chatbot enregistrées pour un utilisateur donné. | |
| Args: | |
| db_manager: instance de DBManager. | |
| user_id (int): ID de l'utilisateur. | |
| Returns: | |
| List[Tuple]: list des suggestions du chatbot. | |
| """ | |
| query = "SELECT repas_suggestion FROM suggestions_repas WHERE id_utilisateur = ?" | |
| try: | |
| db_manager.cursor.execute(query, (user_id,)) | |
| suggestions = [row[0] for row in db_manager.cursor.fetchall()] | |
| return suggestions | |
| except sqlite3.Error as err: # Utilisation de sqlite3.Error pour capturer les erreurs SQLite | |
| logger.error(f"Erreur lors du chargement des suggestions : {err}") | |
| return [] | |
| # def save_chatbot_suggestions(db_manager, user_id, suggestions): | |
| # """ | |
| # Sauvegarde les suggestions du chatbot dans la base de données. | |
| # """ | |
| # query = """ | |
| # INSERT INTO suggestions_repas (id_utilisateur, repas_suggestion, motif_suggestion) | |
| # VALUES (%s, %s, %s) | |
| # """ | |
| # try: | |
| # for suggestion in suggestions: | |
| # db_manager.cursor.execute(query, (user_id, suggestion, "Chatbot")) | |
| # db_manager.connection.commit() | |
| # except psycopg2.Error as err: | |
| # logger.error(f"Erreur lors de l'enregistrement des suggestions : {err}") | |
| def save_chatbot_suggestions(db_manager, user_id, suggestions: List[Tuple]): | |
| """ | |
| Sauvegarde les suggestions du chatbot dans la base de données. | |
| Args: | |
| db_manager: instance de DBManager. | |
| user_id (int): ID de l'utilisateur. | |
| suggestions (List[Tuple]): list des suggestions du chatbot. | |
| """ | |
| query = """ | |
| INSERT INTO suggestions_repas (id_utilisateur, repas_suggestion, motif_suggestion) | |
| VALUES (?, ?, ?) | |
| """ | |
| try: | |
| for suggestion in suggestions: | |
| db_manager.cursor.execute(query, (user_id, suggestion, "Chatbot")) | |
| db_manager.connection.commit() | |
| except sqlite3.Error as err: # Remplacer psycopg2.Error par sqlite3.Error pour SQLite | |
| logger.error(f"Erreur lors de l'enregistrement des suggestions : {err}") | |
| def save_recipes_and_ingredients(db_manager, user_id: int, recipes: List[Dict[str, str]]): | |
| """ | |
| Sauvegarde les recettes et leurs ingrédients en base de données. | |
| Args: | |
| db_manager: Instance de DBManager. | |
| user_id (int): ID de l'utilisateur. | |
| recipes (List[Dict[str, str]]): Liste des recettes contenant "titre" et "ingredients". | |
| """ | |
| query_recipe = """ | |
| INSERT INTO suggestions_repas (id_utilisateur, repas_suggestion, motif_suggestion, date_heure) | |
| VALUES (?, ?, ?, CURRENT_TIMESTAMP) | |
| """ | |
| query_ingredients = """ | |
| INSERT INTO liste_courses (id_utilisateur, ingredients, date_creation, status) | |
| VALUES (?, ?, CURRENT_TIMESTAMP, 'non acheté') | |
| """ | |
| try: | |
| for recipe in recipes: | |
| # Insérer le titre de la recette | |
| db_manager.cursor.execute(query_recipe, (user_id, recipe["titre"], "Chatbot")) | |
| # Insérer les ingrédients associés | |
| db_manager.cursor.execute(query_ingredients, (user_id, recipe["ingredients"])) | |
| db_manager.connection.commit() | |
| except sqlite3.Error as err: | |
| logger.error(f"❌ Erreur lors de l'enregistrement des recettes : {err}") | |
| def normalize_text(text): | |
| """Normalise un texte en supprimant les accents et en le mettant en minuscules""" | |
| text = text.lower().strip() | |
| text = unicodedata.normalize("NFKD", text).encode("ASCII", "ignore").decode("utf-8") | |
| return text | |
| def save_recipes_with_ingredients(db_manager, user_id, title, ingredients): | |
| """ | |
| Sauvegarde une recette et ses ingrédients en base de données, | |
| en évitant les doublons liés aux différences de majuscules ou d'accents. | |
| """ | |
| if not ingredients: | |
| print(f"⚠️ La recette '{title}' n'a pas d'ingrédients à enregistrer.") | |
| return | |
| # 🔹 Normaliser le titre pour éviter les doublons | |
| normalized_title = normalize_text(title) | |
| # 🔹 Vérifier si la recette existe déjà (en ignorant les majuscules et accents) | |
| query_check = "SELECT id_suggestion FROM suggestions_repas WHERE LOWER(repas_suggestion) = LOWER(?) AND id_utilisateur = ?" | |
| result = db_manager.execute_safe(query_check, (normalized_title, user_id), fetch=True) | |
| if result: | |
| recipe_id = result[0][0] | |
| # 🔹 Mise à jour des ingrédients si la recette existe déjà | |
| query_update = "UPDATE suggestions_repas SET ingredients = ? WHERE id_suggestion = ?" | |
| db_manager.execute_safe(query_update, (ingredients, recipe_id)) | |
| print(f"🔄 Mise à jour des ingrédients pour '{title}'.") | |
| else: | |
| # 🔹 Insertion d'une nouvelle recette avec son titre et ingrédients | |
| query_insert = "INSERT INTO suggestions_repas (id_utilisateur, repas_suggestion, ingredients) VALUES (?, ?, ?)" | |
| db_manager.execute_safe(query_insert, (user_id, title, ingredients)) | |
| print(f"✅ Recette '{title}' enregistrée avec ingrédients.") | |
| def get_recipes_and_ingredients(db_manager, user_id: int) -> List[Dict[str, str]]: | |
| """ | |
| Récupère les recettes et leurs ingrédients stockés en base. | |
| Args: | |
| db_manager: Instance de DBManager. | |
| user_id (int): ID de l'utilisateur. | |
| Returns: | |
| List[Dict[str, str]]: Liste des recettes avec leurs ingrédients. | |
| """ | |
| query = """ | |
| SELECT repas_suggestion, ingredients | |
| FROM suggestions_repas | |
| WHERE id_utilisateur = ? | |
| """ | |
| try: | |
| db_manager.cursor.execute(query, (user_id,)) | |
| recipes = [{"titre": row[0], "ingredients": row[1]} for row in db_manager.cursor.fetchall()] | |
| return recipes | |
| except sqlite3.Error as err: | |
| print(f"❌ Erreur lors de la récupération des recettes : {err}") | |
| return [] | |
| def add_ingredients_column_if_not_exists(db_manager): | |
| """ | |
| Vérifie et ajoute la colonne 'ingredients' à la table 'suggestions_repas' si elle n'existe pas. | |
| """ | |
| try: | |
| db_manager.cursor.execute("PRAGMA table_info(suggestions_repas);") | |
| columns = [row[1] for row in db_manager.cursor.fetchall()] | |
| if "ingredients" not in columns: | |
| db_manager.cursor.execute("ALTER TABLE suggestions_repas ADD COLUMN ingredients TEXT;") | |
| db_manager.connection.commit() | |
| print("✅ Colonne 'ingredients' ajoutée avec succès.") | |
| else: | |
| print("✅ La colonne 'ingredients' existe déjà.") | |
| except sqlite3.Error as err: | |
| print(f"❌ Erreur lors de la mise à jour de la table : {err}") | |
| def check_recipes_with_ingredients(db_manager, user_id): | |
| """ | |
| Vérifie si les recettes enregistrées ont bien des ingrédients. | |
| """ | |
| query = "SELECT repas_suggestion, ingredients FROM suggestions_repas WHERE id_utilisateur = ?" | |
| try: | |
| db_manager.cursor.execute(query, (user_id,)) | |
| results = db_manager.cursor.fetchall() | |
| if results: | |
| print("✅ Recettes et ingrédients trouvés en base :") | |
| for row in results: | |
| print(f"📌 Recette: {row[0]} - Ingrédients: {row[1]}") | |
| else: | |
| print("⚠️ Aucune recette avec ingrédients trouvée en base.") | |
| except sqlite3.Error as err: | |
| print(f"❌ Erreur lors de la récupération des recettes : {err}") | |