api_search_articles / script /1_create_dataset.py
Loren's picture
Upload 8 files
5327a45 verified
raw
history blame
6.69 kB
##############################################################################################
### Script de création de la base de données articles à partir du fichier parquet,
### correspondant au jeu d'essai : https://www.kaggle.com/code/fabiochiusano/medium-articles-simple-data-analysis
### Téléchargement du csv puis conversion en Parquet avec compression snappy :
### df = pd.read_csv("medium_articles.csv")
### df.to_parquet("medium_articles.parquet", engine="pyarrow", compression="snappy")
###
### Le fichier a été uploadé dans un dataset HF : Loren/articles_db
###
### Ce script
### - crée une base SQLite articles.db constituée des 3 tables : tags, articles, et tag_article
### - l'upload dans le dataset HF Loren/articles_db
### - crée les fichiers Parquet compressés à partir des tables SQLite
### - l'upload dans le dataset HF Loren/articles_database
###
### 👉 Ils peuvent alors être utilisés par un space Hugging Face
##############################################################################################
import sqlite3
import pandas as pd
import os
from dotenv import load_dotenv
import itertools
import ast
import uuid
from huggingface_hub import hf_hub_download, upload_file
from pathlib import Path
from collections import Counter
# Initialisations
print("Initialisations ...")
load_dotenv()
HF_TOKEN = os.getenv('API_HF_TOKEN')
# Constantes
MIN_COUNT = 5 # nombre minimum d'occurrences pour qu'un tag soit conservé
DATA_DIR = Path("../../Data") # dossier parent du script
REPO_ID_DB = "Loren/articles_db" # dataset HF
REPO_ID = "Loren/articles_database" # dataset HF
DB_NAME = 'articles.db'
SQLITE_FILE = DATA_DIR / DB_NAME
LIST_TABLES = ["articles", "tags", "tag_article"]
PARQUET_DIR = DATA_DIR / "parquet_tables"
# Chargement des données
parquet_path = hf_hub_download(repo_id=REPO_ID_DB,
filename="medium_articles.parquet",
repo_type="dataset")
# Créer les dossiers s'ils n'existent pas
DATA_DIR.mkdir(exist_ok=True)
PARQUET_DIR.mkdir(exist_ok=True)
# Chargement des données
print("Chargement des données ...")
df = pd.read_parquet(parquet_path)
# Initialisations de la base SQLite
print("Initialisations de la base SQLite ...")
conn = sqlite3.connect(SQLITE_FILE)
cur = conn.cursor()
# Suppression des anciennes tables
cur.execute("DROP TABLE IF EXISTS tag_article")
cur.execute("DROP TABLE IF EXISTS tags")
cur.execute("DROP TABLE IF EXISTS articles")
# Création des tables Articles, Tags, et de la table d'association articles <-> tags
cur.execute("""
CREATE TABLE articles (
article_id TEXT PRIMARY KEY, -- UUID
article_title TEXT,
article_text TEXT,
article_url TEXT,
article_authors TEXT,
article_date TEXT -- YYYY-MM-DD
)""")
cur.execute("""
CREATE TABLE tags (
tag_id INTEGER PRIMARY KEY AUTOINCREMENT,
tag_name TEXT UNIQUE
)""")
cur.execute("""
CREATE TABLE tag_article (
tag_article_id INTEGER PRIMARY KEY AUTOINCREMENT,
article_id TEXT,
tag_id INTEGER,
FOREIGN KEY(article_id) REFERENCES articles(article_id),
FOREIGN KEY(tag_id) REFERENCES tags(tag_id)
)""")
# Extraction des tags en une liste
print("Extraction des tags en une liste ...")
df['list_tags'] = df['tags'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else [])
# Extraire tous les tags uniques
all_tags = list(itertools.chain.from_iterable(df['list_tags']))
# Comptage du nombre d'occurrences de chaque tag
tag_counts = Counter(all_tags)
# On ne va conserver que les tags avec au moins 100 occurrences
list_tags = [tag for tag, count in tag_counts.items() if count >= MIN_COUNT]
# Insertion des tags dans la table
print("Insertion des tags dans la table ...")
cur.executemany("INSERT INTO tags (tag_name) VALUES (?)", [(tag,) for tag in list_tags])
# Récupération des correspondances tag_name -> tag_id
print("Récupération des correspondances tag_name -> tag_id ...")
cur.execute("SELECT tag_id, tag_name FROM tags")
dict_tag_map = {tag_name: tag_id for tag_id, tag_name in cur.fetchall()}
# Insertion des articles et table d'association dans les tables
print("Insertion des articles et table d'association dans les tables ...")
for _, row in df.iterrows():
# Détermination de l'id article
article_id = str(uuid.uuid4())
# Extraction de la date du timestamp
date_value = None
if pd.notna(row["timestamp"]):
try:
date_value = str(pd.to_datetime(row["timestamp"]).date())
except Exception:
date_value = None
# Insertion dans la table Articles
cur.execute("""
INSERT INTO articles (article_id, article_title, article_text, article_url, article_authors, article_date)
VALUES (?, ?, ?, ?, ?, ?)""",
(article_id, row["title"], row["text"], row["url"], row["authors"], date_value))
# Association aux tags
for tag_name in row['list_tags']:
try:
tag_id = dict_tag_map[tag_name]
cur.execute("INSERT INTO tag_article (article_id, tag_id) VALUES (?, ?)",
(article_id, tag_id))
except:
pass
print("-> ", len(list_tags), " tags")
cur.execute("SELECT COUNT(*) FROM tag_article")
nb_lignes = cur.fetchone()[0]
print("-> ", nb_lignes, " associations articles <-> tags")
print("-> ", len(df), " articles")
# Commit
print("Commit ...")
conn.commit()
# Upload dans le dataset hugging face
print("Upload base Sqlite dans le dataset hugging face ...")
upload_file(
path_or_fileobj=SQLITE_FILE,
path_in_repo=DB_NAME,
repo_id=REPO_ID_DB,
repo_type="dataset",
token=HF_TOKEN
)
# Création des fichiers Parquet compressés
print("Création des fichiers Parquet compressés ...")
parquet_files = []
for table in LIST_TABLES:
df = pd.read_sql_query(f"SELECT * FROM {table}", conn)
parquet_path = PARQUET_DIR / f"{table}.parquet"
df.to_parquet(parquet_path, engine="pyarrow", index=False, compression="snappy")
parquet_files.append(parquet_path)
# Upload des fichiers Parquet vers HF
print("Upload des fichiers Parquet dans le dataset hugging face ...")
for parquet_file in parquet_files:
print(f"Uploading {parquet_file.name} ...")
upload_file(
path_or_fileobj=parquet_file,
path_in_repo=parquet_file.name,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
print("Upload terminé ✅")
conn.close()
print("Traitement terminé.")