############################################################################################## ### Script de création de la base de données articles à partir du fichier parquet, ### correspondant au jeu d'essai : https://www.kaggle.com/code/fabiochiusano/medium-articles-simple-data-analysis ### Téléchargement du csv puis conversion en Parquet avec compression snappy : ### df = pd.read_csv("medium_articles.csv") ### df.to_parquet("medium_articles.parquet", engine="pyarrow", compression="snappy") ### ### Le fichier a été uploadé dans un dataset HF : Loren/articles_db ### ### Ce script ### - crée une base SQLite articles.db constituée des 3 tables : tags, articles, et tag_article ### - l'upload dans le dataset HF Loren/articles_db ### - crée les fichiers Parquet compressés à partir des tables SQLite ### - l'upload dans le dataset HF Loren/articles_database ### ### 👉 Ils peuvent alors être utilisés par un space Hugging Face ############################################################################################## import sqlite3 import pandas as pd import os from dotenv import load_dotenv import itertools import ast import uuid from huggingface_hub import hf_hub_download, upload_file from pathlib import Path from collections import Counter # Initialisations print("Initialisations ...") load_dotenv() HF_TOKEN = os.getenv('API_HF_TOKEN') # Constantes MIN_COUNT = 5 # nombre minimum d'occurrences pour qu'un tag soit conservé DATA_DIR = Path("../../Data") # dossier parent du script REPO_ID_DB = "Loren/articles_db" # dataset HF REPO_ID = "Loren/articles_database" # dataset HF DB_NAME = 'articles.db' SQLITE_FILE = DATA_DIR / DB_NAME LIST_TABLES = ["articles", "tags", "tag_article"] PARQUET_DIR = DATA_DIR / "parquet_tables" # Chargement des données parquet_path = hf_hub_download(repo_id=REPO_ID_DB, filename="medium_articles.parquet", repo_type="dataset") # Créer les dossiers s'ils n'existent pas DATA_DIR.mkdir(exist_ok=True) PARQUET_DIR.mkdir(exist_ok=True) # Chargement des données print("Chargement des données ...") df = pd.read_parquet(parquet_path) # Initialisations de la base SQLite print("Initialisations de la base SQLite ...") conn = sqlite3.connect(SQLITE_FILE) cur = conn.cursor() # Suppression des anciennes tables cur.execute("DROP TABLE IF EXISTS tag_article") cur.execute("DROP TABLE IF EXISTS tags") cur.execute("DROP TABLE IF EXISTS articles") # Création des tables Articles, Tags, et de la table d'association articles <-> tags cur.execute(""" CREATE TABLE articles ( article_id TEXT PRIMARY KEY, -- UUID article_title TEXT, article_text TEXT, article_url TEXT, article_authors TEXT, article_date TEXT -- YYYY-MM-DD )""") cur.execute(""" CREATE TABLE tags ( tag_id INTEGER PRIMARY KEY AUTOINCREMENT, tag_name TEXT UNIQUE )""") cur.execute(""" CREATE TABLE tag_article ( tag_article_id INTEGER PRIMARY KEY AUTOINCREMENT, article_id TEXT, tag_id INTEGER, FOREIGN KEY(article_id) REFERENCES articles(article_id), FOREIGN KEY(tag_id) REFERENCES tags(tag_id) )""") # Extraction des tags en une liste print("Extraction des tags en une liste ...") df['list_tags'] = df['tags'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else []) # Extraire tous les tags uniques all_tags = list(itertools.chain.from_iterable(df['list_tags'])) # Comptage du nombre d'occurrences de chaque tag tag_counts = Counter(all_tags) # On ne va conserver que les tags avec au moins 100 occurrences list_tags = [tag for tag, count in tag_counts.items() if count >= MIN_COUNT] # Insertion des tags dans la table print("Insertion des tags dans la table ...") cur.executemany("INSERT INTO tags (tag_name) VALUES (?)", [(tag,) for tag in list_tags]) # Récupération des correspondances tag_name -> tag_id print("Récupération des correspondances tag_name -> tag_id ...") cur.execute("SELECT tag_id, tag_name FROM tags") dict_tag_map = {tag_name: tag_id for tag_id, tag_name in cur.fetchall()} # Insertion des articles et table d'association dans les tables print("Insertion des articles et table d'association dans les tables ...") for _, row in df.iterrows(): # Détermination de l'id article article_id = str(uuid.uuid4()) # Extraction de la date du timestamp date_value = None if pd.notna(row["timestamp"]): try: date_value = str(pd.to_datetime(row["timestamp"]).date()) except Exception: date_value = None # Insertion dans la table Articles cur.execute(""" INSERT INTO articles (article_id, article_title, article_text, article_url, article_authors, article_date) VALUES (?, ?, ?, ?, ?, ?)""", (article_id, row["title"], row["text"], row["url"], row["authors"], date_value)) # Association aux tags for tag_name in row['list_tags']: try: tag_id = dict_tag_map[tag_name] cur.execute("INSERT INTO tag_article (article_id, tag_id) VALUES (?, ?)", (article_id, tag_id)) except: pass print("-> ", len(list_tags), " tags") cur.execute("SELECT COUNT(*) FROM tag_article") nb_lignes = cur.fetchone()[0] print("-> ", nb_lignes, " associations articles <-> tags") print("-> ", len(df), " articles") # Commit print("Commit ...") conn.commit() # Upload dans le dataset hugging face print("Upload base Sqlite dans le dataset hugging face ...") upload_file( path_or_fileobj=SQLITE_FILE, path_in_repo=DB_NAME, repo_id=REPO_ID_DB, repo_type="dataset", token=HF_TOKEN ) # Création des fichiers Parquet compressés print("Création des fichiers Parquet compressés ...") parquet_files = [] for table in LIST_TABLES: df = pd.read_sql_query(f"SELECT * FROM {table}", conn) parquet_path = PARQUET_DIR / f"{table}.parquet" df.to_parquet(parquet_path, engine="pyarrow", index=False, compression="snappy") parquet_files.append(parquet_path) # Upload des fichiers Parquet vers HF print("Upload des fichiers Parquet dans le dataset hugging face ...") for parquet_file in parquet_files: print(f"Uploading {parquet_file.name} ...") upload_file( path_or_fileobj=parquet_file, path_in_repo=parquet_file.name, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) print("Upload terminé ✅") conn.close() print("Traitement terminé.")