Spaces:
Sleeping
Sleeping
File size: 6,692 Bytes
5327a45 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
##############################################################################################
### Script de création de la base de données articles à partir du fichier parquet,
### correspondant au jeu d'essai : https://www.kaggle.com/code/fabiochiusano/medium-articles-simple-data-analysis
### Téléchargement du csv puis conversion en Parquet avec compression snappy :
### df = pd.read_csv("medium_articles.csv")
### df.to_parquet("medium_articles.parquet", engine="pyarrow", compression="snappy")
###
### Le fichier a été uploadé dans un dataset HF : Loren/articles_db
###
### Ce script
### - crée une base SQLite articles.db constituée des 3 tables : tags, articles, et tag_article
### - l'upload dans le dataset HF Loren/articles_db
### - crée les fichiers Parquet compressés à partir des tables SQLite
### - l'upload dans le dataset HF Loren/articles_database
###
### 👉 Ils peuvent alors être utilisés par un space Hugging Face
##############################################################################################
import sqlite3
import pandas as pd
import os
from dotenv import load_dotenv
import itertools
import ast
import uuid
from huggingface_hub import hf_hub_download, upload_file
from pathlib import Path
from collections import Counter
# Initialisations
print("Initialisations ...")
load_dotenv()
HF_TOKEN = os.getenv('API_HF_TOKEN')
# Constantes
MIN_COUNT = 5 # nombre minimum d'occurrences pour qu'un tag soit conservé
DATA_DIR = Path("../../Data") # dossier parent du script
REPO_ID_DB = "Loren/articles_db" # dataset HF
REPO_ID = "Loren/articles_database" # dataset HF
DB_NAME = 'articles.db'
SQLITE_FILE = DATA_DIR / DB_NAME
LIST_TABLES = ["articles", "tags", "tag_article"]
PARQUET_DIR = DATA_DIR / "parquet_tables"
# Chargement des données
parquet_path = hf_hub_download(repo_id=REPO_ID_DB,
filename="medium_articles.parquet",
repo_type="dataset")
# Créer les dossiers s'ils n'existent pas
DATA_DIR.mkdir(exist_ok=True)
PARQUET_DIR.mkdir(exist_ok=True)
# Chargement des données
print("Chargement des données ...")
df = pd.read_parquet(parquet_path)
# Initialisations de la base SQLite
print("Initialisations de la base SQLite ...")
conn = sqlite3.connect(SQLITE_FILE)
cur = conn.cursor()
# Suppression des anciennes tables
cur.execute("DROP TABLE IF EXISTS tag_article")
cur.execute("DROP TABLE IF EXISTS tags")
cur.execute("DROP TABLE IF EXISTS articles")
# Création des tables Articles, Tags, et de la table d'association articles <-> tags
cur.execute("""
CREATE TABLE articles (
article_id TEXT PRIMARY KEY, -- UUID
article_title TEXT,
article_text TEXT,
article_url TEXT,
article_authors TEXT,
article_date TEXT -- YYYY-MM-DD
)""")
cur.execute("""
CREATE TABLE tags (
tag_id INTEGER PRIMARY KEY AUTOINCREMENT,
tag_name TEXT UNIQUE
)""")
cur.execute("""
CREATE TABLE tag_article (
tag_article_id INTEGER PRIMARY KEY AUTOINCREMENT,
article_id TEXT,
tag_id INTEGER,
FOREIGN KEY(article_id) REFERENCES articles(article_id),
FOREIGN KEY(tag_id) REFERENCES tags(tag_id)
)""")
# Extraction des tags en une liste
print("Extraction des tags en une liste ...")
df['list_tags'] = df['tags'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else [])
# Extraire tous les tags uniques
all_tags = list(itertools.chain.from_iterable(df['list_tags']))
# Comptage du nombre d'occurrences de chaque tag
tag_counts = Counter(all_tags)
# On ne va conserver que les tags avec au moins 100 occurrences
list_tags = [tag for tag, count in tag_counts.items() if count >= MIN_COUNT]
# Insertion des tags dans la table
print("Insertion des tags dans la table ...")
cur.executemany("INSERT INTO tags (tag_name) VALUES (?)", [(tag,) for tag in list_tags])
# Récupération des correspondances tag_name -> tag_id
print("Récupération des correspondances tag_name -> tag_id ...")
cur.execute("SELECT tag_id, tag_name FROM tags")
dict_tag_map = {tag_name: tag_id for tag_id, tag_name in cur.fetchall()}
# Insertion des articles et table d'association dans les tables
print("Insertion des articles et table d'association dans les tables ...")
for _, row in df.iterrows():
# Détermination de l'id article
article_id = str(uuid.uuid4())
# Extraction de la date du timestamp
date_value = None
if pd.notna(row["timestamp"]):
try:
date_value = str(pd.to_datetime(row["timestamp"]).date())
except Exception:
date_value = None
# Insertion dans la table Articles
cur.execute("""
INSERT INTO articles (article_id, article_title, article_text, article_url, article_authors, article_date)
VALUES (?, ?, ?, ?, ?, ?)""",
(article_id, row["title"], row["text"], row["url"], row["authors"], date_value))
# Association aux tags
for tag_name in row['list_tags']:
try:
tag_id = dict_tag_map[tag_name]
cur.execute("INSERT INTO tag_article (article_id, tag_id) VALUES (?, ?)",
(article_id, tag_id))
except:
pass
print("-> ", len(list_tags), " tags")
cur.execute("SELECT COUNT(*) FROM tag_article")
nb_lignes = cur.fetchone()[0]
print("-> ", nb_lignes, " associations articles <-> tags")
print("-> ", len(df), " articles")
# Commit
print("Commit ...")
conn.commit()
# Upload dans le dataset hugging face
print("Upload base Sqlite dans le dataset hugging face ...")
upload_file(
path_or_fileobj=SQLITE_FILE,
path_in_repo=DB_NAME,
repo_id=REPO_ID_DB,
repo_type="dataset",
token=HF_TOKEN
)
# Création des fichiers Parquet compressés
print("Création des fichiers Parquet compressés ...")
parquet_files = []
for table in LIST_TABLES:
df = pd.read_sql_query(f"SELECT * FROM {table}", conn)
parquet_path = PARQUET_DIR / f"{table}.parquet"
df.to_parquet(parquet_path, engine="pyarrow", index=False, compression="snappy")
parquet_files.append(parquet_path)
# Upload des fichiers Parquet vers HF
print("Upload des fichiers Parquet dans le dataset hugging face ...")
for parquet_file in parquet_files:
print(f"Uploading {parquet_file.name} ...")
upload_file(
path_or_fileobj=parquet_file,
path_in_repo=parquet_file.name,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
print("Upload terminé ✅")
conn.close()
print("Traitement terminé.") |