Spaces:
Sleeping
Sleeping
| ############################################################################################## | |
| ### Script de vérification des url des articles | |
| ### | |
| ### Ce script | |
| ### - charge la table articles depuis le dataset HF Loren/articles_database | |
| ### - check si les url des articles sont des pages actives (status code 200) | |
| ### - ajoute cette information dans une colonne article_online | |
| ### - crée les fichiers Parquet compressés à partir de la table créée | |
| ### - l'upload dans le dataset HF Loren/articles_database | |
| ### | |
| ### 👉 Il peut alors être utilisé par un space Hugging Face | |
| ############################################################################################## | |
| import os | |
| from dotenv import load_dotenv | |
| from huggingface_hub import hf_hub_download, upload_file | |
| from pathlib import Path | |
| from typing import List, Any, Tuple | |
| import duckdb | |
| import asyncio | |
| import aiohttp | |
| from tqdm.asyncio import tqdm_asyncio | |
| # Fonctions | |
| async def check_url(session: aiohttp.ClientSession, sem: asyncio.Semaphore, url: str) -> bool: | |
| """ | |
| Vérifie si une URL est accessible (retourne un code HTTP < 400). | |
| Args: | |
| session (aiohttp.ClientSession): Session HTTP réutilisable pour les requêtes. | |
| sem (asyncio.Semaphore): Sémaphore pour limiter le nombre de requêtes simultanées. | |
| url (str): L'URL à vérifier. | |
| Returns: | |
| bool: True si l’URL répond avec un code < 400, sinon False. | |
| """ | |
| if not url: | |
| return False | |
| async with sem: # limite de concurrence | |
| try: | |
| async with session.head(url, allow_redirects=True, timeout=TIMEOUT) as resp: | |
| return resp.status < 400 | |
| except Exception: | |
| return False | |
| async def process_batch(batch: List[Tuple[Any, ...]]) -> List[bool]: | |
| """ | |
| Traite un batch d’URLs avec une limite de requêtes simultanées. | |
| Args: | |
| batch (List[Tuple[Any, ...]]): Liste de tuples représentant les lignes d'articles. | |
| Chaque tuple doit contenir au moins une colonne d’URL à l’index 3. | |
| Returns: | |
| List[bool]: Liste de statuts (True/False) correspondant à l’accessibilité de chaque URL. | |
| """ | |
| sem = asyncio.Semaphore(MAX_CONCURRENCY) | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [ | |
| check_url(session, sem, row[3]) # row[3] = article_url | |
| for row in batch | |
| ] | |
| return await tqdm_asyncio.gather(*tasks) | |
| async def main() -> None: | |
| """ | |
| Exécute le traitement complet : | |
| - Récupère les articles par batch depuis la base. | |
| - Vérifie la disponibilité des URLs. | |
| - Insère les résultats enrichis dans une table de sortie. | |
| Returns: | |
| None | |
| """ | |
| total_rows = con.execute("SELECT COUNT(*) FROM articles").fetchone()[0] | |
| total_batches = (total_rows + BATCH_SIZE - 1) // BATCH_SIZE | |
| print(f"🔍 {total_rows} lignes à traiter ({total_batches} batchs de {BATCH_SIZE})") | |
| for batch_index in range(total_batches): | |
| offset = batch_index * BATCH_SIZE | |
| # Charger un batch depuis la table | |
| batch = con.execute(f""" | |
| SELECT * FROM articles | |
| LIMIT {BATCH_SIZE} OFFSET {offset} | |
| """).fetchall() | |
| # Vérifier les URLs | |
| online_statuses = await process_batch(batch) | |
| # Préparer les données enrichies | |
| enriched_rows = [ | |
| (*row, status) | |
| for row, status in zip(batch, online_statuses) | |
| ] | |
| # Insérer dans la table physique | |
| con.executemany(f""" | |
| INSERT INTO {TABLE_OUTPUT} VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, enriched_rows) | |
| print(f"✅ Batch {batch_index + 1}/{total_batches} traité ({len(batch)} lignes)") | |
| print("🎉 Traitement terminé !") | |
| print(f"Résultat stocké dans la table '{TABLE_OUTPUT}'") | |
| # | |
| if __name__ == "__main__": | |
| # Initialisations | |
| print("Initialisations ...") | |
| load_dotenv() | |
| HF_TOKEN = os.getenv('API_HF_TOKEN') | |
| # Constantes | |
| DATA_DIR = Path("../../Data") # dossier parent du script | |
| REPO_ID = "Loren/articles_database" # dataset HF | |
| PARQUET_DIR = DATA_DIR / "parquet_tables" | |
| REPO_ID = "Loren/articles_database" | |
| CACHE_DIR = "/tmp" | |
| TABLE_OUTPUT = "articles_checked" # Table de sortie | |
| BATCH_SIZE = 1000 | |
| MAX_CONCURRENCY = 100 | |
| TIMEOUT = 5 # secondes | |
| parquet_path = PARQUET_DIR / f"{TABLE_OUTPUT}.parquet" | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| # Rediriger le cache HF globalement | |
| os.environ["HF_HOME"] = CACHE_DIR | |
| os.environ["HF_DATASETS_CACHE"] = CACHE_DIR | |
| os.environ["TRANSFORMERS_CACHE"] = CACHE_DIR | |
| # Téléchargement des fichiers Parquet depuis Hugging Face | |
| articles_parquet = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="articles.parquet", | |
| repo_type="dataset", | |
| cache_dir=CACHE_DIR) | |
| # Connexion DuckDB en mémoire | |
| con = duckdb.connect() | |
| # Créer des tables DuckDB directement à partir des fichiers Parquet | |
| con.execute(f"CREATE VIEW articles AS SELECT * FROM parquet_scan('{articles_parquet}')") | |
| # Créer la table cible | |
| con.execute(f"""CREATE TABLE {TABLE_OUTPUT} AS | |
| SELECT *, NULL::BOOLEAN AS article_online | |
| FROM articles | |
| WHERE 1=0""") | |
| # Traitement principal | |
| asyncio.run(main()) | |
| # Sauvegarde du résultat dans un fichier Parquet | |
| con.execute(f"""COPY {TABLE_OUTPUT} TO '{parquet_path}' | |
| (FORMAT PARQUET, , COMPRESSION 'SNAPPY')""") | |
| print(f"✅ Fichier Parquet créé : {parquet_path}") | |
| # Upload des fichiers Parquet vers HF | |
| print(f"Uploading {parquet_path} ...") | |
| upload_file( | |
| path_or_fileobj=parquet_path, | |
| path_in_repo=f"{TABLE_OUTPUT}.parquet", | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| print("✅ Traitement terminé.") |