Spaces:
Sleeping
Sleeping
| ############################################################################################## | |
| ### Script de création de la base de données articles à partir du fichier parquet, | |
| ### correspondant au jeu d'essai : https://www.kaggle.com/code/fabiochiusano/medium-articles-simple-data-analysis | |
| ### Téléchargement du csv puis conversion en Parquet avec compression snappy : | |
| ### df = pd.read_csv("medium_articles.csv") | |
| ### df.to_parquet("medium_articles.parquet", engine="pyarrow", compression="snappy") | |
| ### | |
| ### Le fichier a été uploadé dans un dataset HF : Loren/articles_db | |
| ### | |
| ### Ce script | |
| ### - crée une base SQLite articles.db constituée des 3 tables : tags, articles, et tag_article | |
| ### - l'upload dans le dataset HF Loren/articles_db | |
| ### - crée les fichiers Parquet compressés à partir des tables SQLite | |
| ### - l'upload dans le dataset HF Loren/articles_database | |
| ### | |
| ### 👉 Ils peuvent alors être utilisés par un space Hugging Face | |
| ############################################################################################## | |
| import sqlite3 | |
| import pandas as pd | |
| import os | |
| from dotenv import load_dotenv | |
| import itertools | |
| import ast | |
| import uuid | |
| from huggingface_hub import hf_hub_download, upload_file | |
| from pathlib import Path | |
| from collections import Counter | |
| # Initialisations | |
| print("Initialisations ...") | |
| load_dotenv() | |
| HF_TOKEN = os.getenv('API_HF_TOKEN') | |
| # Constantes | |
| MIN_COUNT = 5 # nombre minimum d'occurrences pour qu'un tag soit conservé | |
| DATA_DIR = Path("../../Data") # dossier parent du script | |
| REPO_ID_DB = "Loren/articles_db" # dataset HF | |
| REPO_ID = "Loren/articles_database" # dataset HF | |
| DB_NAME = 'articles.db' | |
| SQLITE_FILE = DATA_DIR / DB_NAME | |
| LIST_TABLES = ["articles", "tags", "tag_article"] | |
| PARQUET_DIR = DATA_DIR / "parquet_tables" | |
| # Chargement des données | |
| parquet_path = hf_hub_download(repo_id=REPO_ID_DB, | |
| filename="medium_articles.parquet", | |
| repo_type="dataset") | |
| # Créer les dossiers s'ils n'existent pas | |
| DATA_DIR.mkdir(exist_ok=True) | |
| PARQUET_DIR.mkdir(exist_ok=True) | |
| # Chargement des données | |
| print("Chargement des données ...") | |
| df = pd.read_parquet(parquet_path) | |
| # Initialisations de la base SQLite | |
| print("Initialisations de la base SQLite ...") | |
| conn = sqlite3.connect(SQLITE_FILE) | |
| cur = conn.cursor() | |
| # Suppression des anciennes tables | |
| cur.execute("DROP TABLE IF EXISTS tag_article") | |
| cur.execute("DROP TABLE IF EXISTS tags") | |
| cur.execute("DROP TABLE IF EXISTS articles") | |
| # Création des tables Articles, Tags, et de la table d'association articles <-> tags | |
| cur.execute(""" | |
| CREATE TABLE articles ( | |
| article_id TEXT PRIMARY KEY, -- UUID | |
| article_title TEXT, | |
| article_text TEXT, | |
| article_url TEXT, | |
| article_authors TEXT, | |
| article_date TEXT -- YYYY-MM-DD | |
| )""") | |
| cur.execute(""" | |
| CREATE TABLE tags ( | |
| tag_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| tag_name TEXT UNIQUE | |
| )""") | |
| cur.execute(""" | |
| CREATE TABLE tag_article ( | |
| tag_article_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| article_id TEXT, | |
| tag_id INTEGER, | |
| FOREIGN KEY(article_id) REFERENCES articles(article_id), | |
| FOREIGN KEY(tag_id) REFERENCES tags(tag_id) | |
| )""") | |
| # Extraction des tags en une liste | |
| print("Extraction des tags en une liste ...") | |
| df['list_tags'] = df['tags'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else []) | |
| # Extraire tous les tags uniques | |
| all_tags = list(itertools.chain.from_iterable(df['list_tags'])) | |
| # Comptage du nombre d'occurrences de chaque tag | |
| tag_counts = Counter(all_tags) | |
| # On ne va conserver que les tags avec au moins 100 occurrences | |
| list_tags = [tag for tag, count in tag_counts.items() if count >= MIN_COUNT] | |
| # Insertion des tags dans la table | |
| print("Insertion des tags dans la table ...") | |
| cur.executemany("INSERT INTO tags (tag_name) VALUES (?)", [(tag,) for tag in list_tags]) | |
| # Récupération des correspondances tag_name -> tag_id | |
| print("Récupération des correspondances tag_name -> tag_id ...") | |
| cur.execute("SELECT tag_id, tag_name FROM tags") | |
| dict_tag_map = {tag_name: tag_id for tag_id, tag_name in cur.fetchall()} | |
| # Insertion des articles et table d'association dans les tables | |
| print("Insertion des articles et table d'association dans les tables ...") | |
| for _, row in df.iterrows(): | |
| # Détermination de l'id article | |
| article_id = str(uuid.uuid4()) | |
| # Extraction de la date du timestamp | |
| date_value = None | |
| if pd.notna(row["timestamp"]): | |
| try: | |
| date_value = str(pd.to_datetime(row["timestamp"]).date()) | |
| except Exception: | |
| date_value = None | |
| # Insertion dans la table Articles | |
| cur.execute(""" | |
| INSERT INTO articles (article_id, article_title, article_text, article_url, article_authors, article_date) | |
| VALUES (?, ?, ?, ?, ?, ?)""", | |
| (article_id, row["title"], row["text"], row["url"], row["authors"], date_value)) | |
| # Association aux tags | |
| for tag_name in row['list_tags']: | |
| try: | |
| tag_id = dict_tag_map[tag_name] | |
| cur.execute("INSERT INTO tag_article (article_id, tag_id) VALUES (?, ?)", | |
| (article_id, tag_id)) | |
| except: | |
| pass | |
| print("-> ", len(list_tags), " tags") | |
| cur.execute("SELECT COUNT(*) FROM tag_article") | |
| nb_lignes = cur.fetchone()[0] | |
| print("-> ", nb_lignes, " associations articles <-> tags") | |
| print("-> ", len(df), " articles") | |
| # Commit | |
| print("Commit ...") | |
| conn.commit() | |
| # Upload dans le dataset hugging face | |
| print("Upload base Sqlite dans le dataset hugging face ...") | |
| upload_file( | |
| path_or_fileobj=SQLITE_FILE, | |
| path_in_repo=DB_NAME, | |
| repo_id=REPO_ID_DB, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| # Création des fichiers Parquet compressés | |
| print("Création des fichiers Parquet compressés ...") | |
| parquet_files = [] | |
| for table in LIST_TABLES: | |
| df = pd.read_sql_query(f"SELECT * FROM {table}", conn) | |
| parquet_path = PARQUET_DIR / f"{table}.parquet" | |
| df.to_parquet(parquet_path, engine="pyarrow", index=False, compression="snappy") | |
| parquet_files.append(parquet_path) | |
| # Upload des fichiers Parquet vers HF | |
| print("Upload des fichiers Parquet dans le dataset hugging face ...") | |
| for parquet_file in parquet_files: | |
| print(f"Uploading {parquet_file.name} ...") | |
| upload_file( | |
| path_or_fileobj=parquet_file, | |
| path_in_repo=parquet_file.name, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| print("Upload terminé ✅") | |
| conn.close() | |
| print("Traitement terminé.") |