File size: 6,092 Bytes
5327a45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
##############################################################################################
### Script de vérification des url des articles
###
### Ce script
###    - charge la table articles depuis le dataset HF Loren/articles_database
###    - check si les url des articles sont des pages actives (status code 200)
###    - ajoute cette information dans une colonne article_online
###    - crée les fichiers Parquet compressés à partir de la table créée
###    - l'upload dans le dataset HF Loren/articles_database
###
### 👉 Il peut alors être utilisé par un space Hugging Face
##############################################################################################

import os
from dotenv import load_dotenv
from huggingface_hub import hf_hub_download, upload_file
from pathlib import Path
from typing import List, Any, Tuple
import duckdb
import asyncio
import aiohttp
from tqdm.asyncio import tqdm_asyncio

# Fonctions
async def check_url(session: aiohttp.ClientSession, sem: asyncio.Semaphore, url: str) -> bool:
    """

    Vérifie si une URL est accessible (retourne un code HTTP < 400).



    Args:

        session (aiohttp.ClientSession): Session HTTP réutilisable pour les requêtes.

        sem (asyncio.Semaphore): Sémaphore pour limiter le nombre de requêtes simultanées.

        url (str): L'URL à vérifier.



    Returns:

        bool: True si l’URL répond avec un code < 400, sinon False.

    """
    if not url:
        return False

    async with sem:  # limite de concurrence
        try:
            async with session.head(url, allow_redirects=True, timeout=TIMEOUT) as resp:
                return resp.status < 400
        except Exception:
            return False


async def process_batch(batch: List[Tuple[Any, ...]]) -> List[bool]:
    """

    Traite un batch d’URLs avec une limite de requêtes simultanées.



    Args:

        batch (List[Tuple[Any, ...]]): Liste de tuples représentant les lignes d'articles.

            Chaque tuple doit contenir au moins une colonne d’URL à l’index 3.



    Returns:

        List[bool]: Liste de statuts (True/False) correspondant à l’accessibilité de chaque URL.

    """
    sem = asyncio.Semaphore(MAX_CONCURRENCY)
    async with aiohttp.ClientSession() as session:
        tasks = [
            check_url(session, sem, row[3])  # row[3] = article_url
            for row in batch
        ]
        return await tqdm_asyncio.gather(*tasks)


async def main() -> None:
    """

    Exécute le traitement complet :

    - Récupère les articles par batch depuis la base.

    - Vérifie la disponibilité des URLs.

    - Insère les résultats enrichis dans une table de sortie.



    Returns:

        None

    """
    total_rows = con.execute("SELECT COUNT(*) FROM articles").fetchone()[0]
    total_batches = (total_rows + BATCH_SIZE - 1) // BATCH_SIZE

    print(f"🔍 {total_rows} lignes à traiter ({total_batches} batchs de {BATCH_SIZE})")

    for batch_index in range(total_batches):
        offset = batch_index * BATCH_SIZE

        # Charger un batch depuis la table
        batch = con.execute(f"""

            SELECT * FROM articles

            LIMIT {BATCH_SIZE} OFFSET {offset}

        """).fetchall()

        # Vérifier les URLs
        online_statuses = await process_batch(batch)

        # Préparer les données enrichies
        enriched_rows = [
            (*row, status)
            for row, status in zip(batch, online_statuses)
        ]

        # Insérer dans la table physique
        con.executemany(f"""

            INSERT INTO {TABLE_OUTPUT} VALUES (?, ?, ?, ?, ?, ?, ?)

        """, enriched_rows)

        print(f"✅ Batch {batch_index + 1}/{total_batches} traité ({len(batch)} lignes)")

    print("🎉 Traitement terminé !")
    print(f"Résultat stocké dans la table '{TABLE_OUTPUT}'")
#

if __name__ == "__main__":
    # Initialisations
    print("Initialisations ...")
    load_dotenv()
    HF_TOKEN = os.getenv('API_HF_TOKEN')

    # Constantes
    DATA_DIR = Path("../../Data")   # dossier parent du script
    REPO_ID = "Loren/articles_database"  # dataset HF
    PARQUET_DIR = DATA_DIR / "parquet_tables"
    REPO_ID = "Loren/articles_database"
    CACHE_DIR = "/tmp"
    TABLE_OUTPUT = "articles_checked"  # Table de sortie
    BATCH_SIZE = 1000
    MAX_CONCURRENCY = 100
    TIMEOUT = 5  # secondes
    parquet_path = PARQUET_DIR / f"{TABLE_OUTPUT}.parquet"

    os.makedirs(CACHE_DIR, exist_ok=True)
    # Rediriger le cache HF globalement
    os.environ["HF_HOME"] = CACHE_DIR
    os.environ["HF_DATASETS_CACHE"] = CACHE_DIR
    os.environ["TRANSFORMERS_CACHE"] = CACHE_DIR

    # Téléchargement des fichiers Parquet depuis Hugging Face
    articles_parquet = hf_hub_download(
        repo_id=REPO_ID,
        filename="articles.parquet",
        repo_type="dataset",
        cache_dir=CACHE_DIR)

    # Connexion DuckDB en mémoire
    con = duckdb.connect()

    # Créer des tables DuckDB directement à partir des fichiers Parquet
    con.execute(f"CREATE VIEW articles AS SELECT * FROM parquet_scan('{articles_parquet}')")

    # Créer la table cible
    con.execute(f"""CREATE TABLE {TABLE_OUTPUT} AS

                        SELECT *, NULL::BOOLEAN AS article_online

                            FROM articles

                            WHERE 1=0""")

    # Traitement principal
    asyncio.run(main())

    # Sauvegarde du résultat dans un fichier Parquet
    con.execute(f"""COPY {TABLE_OUTPUT} TO '{parquet_path}'

                (FORMAT PARQUET, , COMPRESSION 'SNAPPY')""")

    print(f"✅ Fichier Parquet créé : {parquet_path}")

    # Upload des fichiers Parquet vers HF
    print(f"Uploading {parquet_path} ...")
    upload_file(
        path_or_fileobj=parquet_path,
        path_in_repo=f"{TABLE_OUTPUT}.parquet",
        repo_id=REPO_ID,
        repo_type="dataset",
        token=HF_TOKEN
    )

    print("✅ Traitement terminé.")