api_search_articles / script /2_check_dataset.py
Loren's picture
Upload 8 files
5327a45 verified
##############################################################################################
### Script de vérification des url des articles
###
### Ce script
### - charge la table articles depuis le dataset HF Loren/articles_database
### - check si les url des articles sont des pages actives (status code 200)
### - ajoute cette information dans une colonne article_online
### - crée les fichiers Parquet compressés à partir de la table créée
### - l'upload dans le dataset HF Loren/articles_database
###
### 👉 Il peut alors être utilisé par un space Hugging Face
##############################################################################################
import os
from dotenv import load_dotenv
from huggingface_hub import hf_hub_download, upload_file
from pathlib import Path
from typing import List, Any, Tuple
import duckdb
import asyncio
import aiohttp
from tqdm.asyncio import tqdm_asyncio
# Fonctions
async def check_url(session: aiohttp.ClientSession, sem: asyncio.Semaphore, url: str) -> bool:
"""
Vérifie si une URL est accessible (retourne un code HTTP < 400).
Args:
session (aiohttp.ClientSession): Session HTTP réutilisable pour les requêtes.
sem (asyncio.Semaphore): Sémaphore pour limiter le nombre de requêtes simultanées.
url (str): L'URL à vérifier.
Returns:
bool: True si l’URL répond avec un code < 400, sinon False.
"""
if not url:
return False
async with sem: # limite de concurrence
try:
async with session.head(url, allow_redirects=True, timeout=TIMEOUT) as resp:
return resp.status < 400
except Exception:
return False
async def process_batch(batch: List[Tuple[Any, ...]]) -> List[bool]:
"""
Traite un batch d’URLs avec une limite de requêtes simultanées.
Args:
batch (List[Tuple[Any, ...]]): Liste de tuples représentant les lignes d'articles.
Chaque tuple doit contenir au moins une colonne d’URL à l’index 3.
Returns:
List[bool]: Liste de statuts (True/False) correspondant à l’accessibilité de chaque URL.
"""
sem = asyncio.Semaphore(MAX_CONCURRENCY)
async with aiohttp.ClientSession() as session:
tasks = [
check_url(session, sem, row[3]) # row[3] = article_url
for row in batch
]
return await tqdm_asyncio.gather(*tasks)
async def main() -> None:
"""
Exécute le traitement complet :
- Récupère les articles par batch depuis la base.
- Vérifie la disponibilité des URLs.
- Insère les résultats enrichis dans une table de sortie.
Returns:
None
"""
total_rows = con.execute("SELECT COUNT(*) FROM articles").fetchone()[0]
total_batches = (total_rows + BATCH_SIZE - 1) // BATCH_SIZE
print(f"🔍 {total_rows} lignes à traiter ({total_batches} batchs de {BATCH_SIZE})")
for batch_index in range(total_batches):
offset = batch_index * BATCH_SIZE
# Charger un batch depuis la table
batch = con.execute(f"""
SELECT * FROM articles
LIMIT {BATCH_SIZE} OFFSET {offset}
""").fetchall()
# Vérifier les URLs
online_statuses = await process_batch(batch)
# Préparer les données enrichies
enriched_rows = [
(*row, status)
for row, status in zip(batch, online_statuses)
]
# Insérer dans la table physique
con.executemany(f"""
INSERT INTO {TABLE_OUTPUT} VALUES (?, ?, ?, ?, ?, ?, ?)
""", enriched_rows)
print(f"✅ Batch {batch_index + 1}/{total_batches} traité ({len(batch)} lignes)")
print("🎉 Traitement terminé !")
print(f"Résultat stocké dans la table '{TABLE_OUTPUT}'")
#
if __name__ == "__main__":
# Initialisations
print("Initialisations ...")
load_dotenv()
HF_TOKEN = os.getenv('API_HF_TOKEN')
# Constantes
DATA_DIR = Path("../../Data") # dossier parent du script
REPO_ID = "Loren/articles_database" # dataset HF
PARQUET_DIR = DATA_DIR / "parquet_tables"
REPO_ID = "Loren/articles_database"
CACHE_DIR = "/tmp"
TABLE_OUTPUT = "articles_checked" # Table de sortie
BATCH_SIZE = 1000
MAX_CONCURRENCY = 100
TIMEOUT = 5 # secondes
parquet_path = PARQUET_DIR / f"{TABLE_OUTPUT}.parquet"
os.makedirs(CACHE_DIR, exist_ok=True)
# Rediriger le cache HF globalement
os.environ["HF_HOME"] = CACHE_DIR
os.environ["HF_DATASETS_CACHE"] = CACHE_DIR
os.environ["TRANSFORMERS_CACHE"] = CACHE_DIR
# Téléchargement des fichiers Parquet depuis Hugging Face
articles_parquet = hf_hub_download(
repo_id=REPO_ID,
filename="articles.parquet",
repo_type="dataset",
cache_dir=CACHE_DIR)
# Connexion DuckDB en mémoire
con = duckdb.connect()
# Créer des tables DuckDB directement à partir des fichiers Parquet
con.execute(f"CREATE VIEW articles AS SELECT * FROM parquet_scan('{articles_parquet}')")
# Créer la table cible
con.execute(f"""CREATE TABLE {TABLE_OUTPUT} AS
SELECT *, NULL::BOOLEAN AS article_online
FROM articles
WHERE 1=0""")
# Traitement principal
asyncio.run(main())
# Sauvegarde du résultat dans un fichier Parquet
con.execute(f"""COPY {TABLE_OUTPUT} TO '{parquet_path}'
(FORMAT PARQUET, , COMPRESSION 'SNAPPY')""")
print(f"✅ Fichier Parquet créé : {parquet_path}")
# Upload des fichiers Parquet vers HF
print(f"Uploading {parquet_path} ...")
upload_file(
path_or_fileobj=parquet_path,
path_in_repo=f"{TABLE_OUTPUT}.parquet",
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
print("✅ Traitement terminé.")