Spaces:
Sleeping
Sleeping
File size: 6,092 Bytes
5327a45 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
##############################################################################################
### Script de vérification des url des articles
###
### Ce script
### - charge la table articles depuis le dataset HF Loren/articles_database
### - check si les url des articles sont des pages actives (status code 200)
### - ajoute cette information dans une colonne article_online
### - crée les fichiers Parquet compressés à partir de la table créée
### - l'upload dans le dataset HF Loren/articles_database
###
### 👉 Il peut alors être utilisé par un space Hugging Face
##############################################################################################
import os
from dotenv import load_dotenv
from huggingface_hub import hf_hub_download, upload_file
from pathlib import Path
from typing import List, Any, Tuple
import duckdb
import asyncio
import aiohttp
from tqdm.asyncio import tqdm_asyncio
# Fonctions
async def check_url(session: aiohttp.ClientSession, sem: asyncio.Semaphore, url: str) -> bool:
"""
Vérifie si une URL est accessible (retourne un code HTTP < 400).
Args:
session (aiohttp.ClientSession): Session HTTP réutilisable pour les requêtes.
sem (asyncio.Semaphore): Sémaphore pour limiter le nombre de requêtes simultanées.
url (str): L'URL à vérifier.
Returns:
bool: True si l’URL répond avec un code < 400, sinon False.
"""
if not url:
return False
async with sem: # limite de concurrence
try:
async with session.head(url, allow_redirects=True, timeout=TIMEOUT) as resp:
return resp.status < 400
except Exception:
return False
async def process_batch(batch: List[Tuple[Any, ...]]) -> List[bool]:
"""
Traite un batch d’URLs avec une limite de requêtes simultanées.
Args:
batch (List[Tuple[Any, ...]]): Liste de tuples représentant les lignes d'articles.
Chaque tuple doit contenir au moins une colonne d’URL à l’index 3.
Returns:
List[bool]: Liste de statuts (True/False) correspondant à l’accessibilité de chaque URL.
"""
sem = asyncio.Semaphore(MAX_CONCURRENCY)
async with aiohttp.ClientSession() as session:
tasks = [
check_url(session, sem, row[3]) # row[3] = article_url
for row in batch
]
return await tqdm_asyncio.gather(*tasks)
async def main() -> None:
"""
Exécute le traitement complet :
- Récupère les articles par batch depuis la base.
- Vérifie la disponibilité des URLs.
- Insère les résultats enrichis dans une table de sortie.
Returns:
None
"""
total_rows = con.execute("SELECT COUNT(*) FROM articles").fetchone()[0]
total_batches = (total_rows + BATCH_SIZE - 1) // BATCH_SIZE
print(f"🔍 {total_rows} lignes à traiter ({total_batches} batchs de {BATCH_SIZE})")
for batch_index in range(total_batches):
offset = batch_index * BATCH_SIZE
# Charger un batch depuis la table
batch = con.execute(f"""
SELECT * FROM articles
LIMIT {BATCH_SIZE} OFFSET {offset}
""").fetchall()
# Vérifier les URLs
online_statuses = await process_batch(batch)
# Préparer les données enrichies
enriched_rows = [
(*row, status)
for row, status in zip(batch, online_statuses)
]
# Insérer dans la table physique
con.executemany(f"""
INSERT INTO {TABLE_OUTPUT} VALUES (?, ?, ?, ?, ?, ?, ?)
""", enriched_rows)
print(f"✅ Batch {batch_index + 1}/{total_batches} traité ({len(batch)} lignes)")
print("🎉 Traitement terminé !")
print(f"Résultat stocké dans la table '{TABLE_OUTPUT}'")
#
if __name__ == "__main__":
# Initialisations
print("Initialisations ...")
load_dotenv()
HF_TOKEN = os.getenv('API_HF_TOKEN')
# Constantes
DATA_DIR = Path("../../Data") # dossier parent du script
REPO_ID = "Loren/articles_database" # dataset HF
PARQUET_DIR = DATA_DIR / "parquet_tables"
REPO_ID = "Loren/articles_database"
CACHE_DIR = "/tmp"
TABLE_OUTPUT = "articles_checked" # Table de sortie
BATCH_SIZE = 1000
MAX_CONCURRENCY = 100
TIMEOUT = 5 # secondes
parquet_path = PARQUET_DIR / f"{TABLE_OUTPUT}.parquet"
os.makedirs(CACHE_DIR, exist_ok=True)
# Rediriger le cache HF globalement
os.environ["HF_HOME"] = CACHE_DIR
os.environ["HF_DATASETS_CACHE"] = CACHE_DIR
os.environ["TRANSFORMERS_CACHE"] = CACHE_DIR
# Téléchargement des fichiers Parquet depuis Hugging Face
articles_parquet = hf_hub_download(
repo_id=REPO_ID,
filename="articles.parquet",
repo_type="dataset",
cache_dir=CACHE_DIR)
# Connexion DuckDB en mémoire
con = duckdb.connect()
# Créer des tables DuckDB directement à partir des fichiers Parquet
con.execute(f"CREATE VIEW articles AS SELECT * FROM parquet_scan('{articles_parquet}')")
# Créer la table cible
con.execute(f"""CREATE TABLE {TABLE_OUTPUT} AS
SELECT *, NULL::BOOLEAN AS article_online
FROM articles
WHERE 1=0""")
# Traitement principal
asyncio.run(main())
# Sauvegarde du résultat dans un fichier Parquet
con.execute(f"""COPY {TABLE_OUTPUT} TO '{parquet_path}'
(FORMAT PARQUET, , COMPRESSION 'SNAPPY')""")
print(f"✅ Fichier Parquet créé : {parquet_path}")
# Upload des fichiers Parquet vers HF
print(f"Uploading {parquet_path} ...")
upload_file(
path_or_fileobj=parquet_path,
path_in_repo=f"{TABLE_OUTPUT}.parquet",
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
print("✅ Traitement terminé.") |