############################################################################################## ### Script de vérification des url des articles ### ### Ce script ### - charge la table articles depuis le dataset HF Loren/articles_database ### - check si les url des articles sont des pages actives (status code 200) ### - ajoute cette information dans une colonne article_online ### - crée les fichiers Parquet compressés à partir de la table créée ### - l'upload dans le dataset HF Loren/articles_database ### ### 👉 Il peut alors être utilisé par un space Hugging Face ############################################################################################## import os from dotenv import load_dotenv from huggingface_hub import hf_hub_download, upload_file from pathlib import Path from typing import List, Any, Tuple import duckdb import asyncio import aiohttp from tqdm.asyncio import tqdm_asyncio # Fonctions async def check_url(session: aiohttp.ClientSession, sem: asyncio.Semaphore, url: str) -> bool: """ Vérifie si une URL est accessible (retourne un code HTTP < 400). Args: session (aiohttp.ClientSession): Session HTTP réutilisable pour les requêtes. sem (asyncio.Semaphore): Sémaphore pour limiter le nombre de requêtes simultanées. url (str): L'URL à vérifier. Returns: bool: True si l’URL répond avec un code < 400, sinon False. """ if not url: return False async with sem: # limite de concurrence try: async with session.head(url, allow_redirects=True, timeout=TIMEOUT) as resp: return resp.status < 400 except Exception: return False async def process_batch(batch: List[Tuple[Any, ...]]) -> List[bool]: """ Traite un batch d’URLs avec une limite de requêtes simultanées. Args: batch (List[Tuple[Any, ...]]): Liste de tuples représentant les lignes d'articles. Chaque tuple doit contenir au moins une colonne d’URL à l’index 3. Returns: List[bool]: Liste de statuts (True/False) correspondant à l’accessibilité de chaque URL. """ sem = asyncio.Semaphore(MAX_CONCURRENCY) async with aiohttp.ClientSession() as session: tasks = [ check_url(session, sem, row[3]) # row[3] = article_url for row in batch ] return await tqdm_asyncio.gather(*tasks) async def main() -> None: """ Exécute le traitement complet : - Récupère les articles par batch depuis la base. - Vérifie la disponibilité des URLs. - Insère les résultats enrichis dans une table de sortie. Returns: None """ total_rows = con.execute("SELECT COUNT(*) FROM articles").fetchone()[0] total_batches = (total_rows + BATCH_SIZE - 1) // BATCH_SIZE print(f"🔍 {total_rows} lignes à traiter ({total_batches} batchs de {BATCH_SIZE})") for batch_index in range(total_batches): offset = batch_index * BATCH_SIZE # Charger un batch depuis la table batch = con.execute(f""" SELECT * FROM articles LIMIT {BATCH_SIZE} OFFSET {offset} """).fetchall() # Vérifier les URLs online_statuses = await process_batch(batch) # Préparer les données enrichies enriched_rows = [ (*row, status) for row, status in zip(batch, online_statuses) ] # Insérer dans la table physique con.executemany(f""" INSERT INTO {TABLE_OUTPUT} VALUES (?, ?, ?, ?, ?, ?, ?) """, enriched_rows) print(f"✅ Batch {batch_index + 1}/{total_batches} traité ({len(batch)} lignes)") print("🎉 Traitement terminé !") print(f"Résultat stocké dans la table '{TABLE_OUTPUT}'") # if __name__ == "__main__": # Initialisations print("Initialisations ...") load_dotenv() HF_TOKEN = os.getenv('API_HF_TOKEN') # Constantes DATA_DIR = Path("../../Data") # dossier parent du script REPO_ID = "Loren/articles_database" # dataset HF PARQUET_DIR = DATA_DIR / "parquet_tables" REPO_ID = "Loren/articles_database" CACHE_DIR = "/tmp" TABLE_OUTPUT = "articles_checked" # Table de sortie BATCH_SIZE = 1000 MAX_CONCURRENCY = 100 TIMEOUT = 5 # secondes parquet_path = PARQUET_DIR / f"{TABLE_OUTPUT}.parquet" os.makedirs(CACHE_DIR, exist_ok=True) # Rediriger le cache HF globalement os.environ["HF_HOME"] = CACHE_DIR os.environ["HF_DATASETS_CACHE"] = CACHE_DIR os.environ["TRANSFORMERS_CACHE"] = CACHE_DIR # Téléchargement des fichiers Parquet depuis Hugging Face articles_parquet = hf_hub_download( repo_id=REPO_ID, filename="articles.parquet", repo_type="dataset", cache_dir=CACHE_DIR) # Connexion DuckDB en mémoire con = duckdb.connect() # Créer des tables DuckDB directement à partir des fichiers Parquet con.execute(f"CREATE VIEW articles AS SELECT * FROM parquet_scan('{articles_parquet}')") # Créer la table cible con.execute(f"""CREATE TABLE {TABLE_OUTPUT} AS SELECT *, NULL::BOOLEAN AS article_online FROM articles WHERE 1=0""") # Traitement principal asyncio.run(main()) # Sauvegarde du résultat dans un fichier Parquet con.execute(f"""COPY {TABLE_OUTPUT} TO '{parquet_path}' (FORMAT PARQUET, , COMPRESSION 'SNAPPY')""") print(f"✅ Fichier Parquet créé : {parquet_path}") # Upload des fichiers Parquet vers HF print(f"Uploading {parquet_path} ...") upload_file( path_or_fileobj=parquet_path, path_in_repo=f"{TABLE_OUTPUT}.parquet", repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) print("✅ Traitement terminé.")