File size: 10,408 Bytes
f0806e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import time
import pandas as pd
from collections import namedtuple
from tqdm import tqdm
import ast
from concurrent.futures import ThreadPoolExecutor
import os
import multiprocessing
from neo4j.exceptions import TransientError 

    
# --- Paramètres ---
BATCH_SIZE = 5000
NUM_THREADS = multiprocessing.cpu_count()
PROCESSED_IDS_FILE = "processed_ids.txt"


def reset_database(driver):
    """
    Efface TOUTES les données de la base Neo4j ET le fichier de suivi des IDs traités.
    À n'utiliser que pour une réinitialisation complète.
    """
    # Étape 1 : Vider la base de données
    with driver.session() as session:
        result = session.run("RETURN 1 AS test")
        print("Connexion OK, test result:", result.single()["test"])
        session.run("MATCH (n) DETACH DELETE n")
        
    # Étape 2 : Supprimer le fichier de suivi pour garantir une réimportation propre
    if os.path.exists(PROCESSED_IDS_FILE):
        os.remove(PROCESSED_IDS_FILE)


def parse_list_field(value):
    """
    Parse une chaîne de caractères qui représente une liste (ex: "['model1', 'model2']")
    en une véritable liste Python. Gère les cas où la valeur est simple ou vide.
    """
    if isinstance(value, str) and pd.notna(value):
        try:
            parsed = ast.literal_eval(value)
            if isinstance(parsed, list):
                return parsed
        except Exception:
            pass
    return [value] if value else []

def load_processed_ids():
    """Charge l'ensemble des IDs déjà traités depuis le fichier de suivi."""
    if os.path.exists(PROCESSED_IDS_FILE):
        with open(PROCESSED_IDS_FILE, "r", encoding="utf-8") as f:
            return set(line.strip() for line in f)
    return set()

def append_processed_ids(ids):
    """Ajoute une liste d'IDs au fichier de suivi."""
    with open(PROCESSED_IDS_FILE, "a", encoding="utf-8") as f:
        for i in ids:
            f.write(f"{i}\n")

def run_with_retry(session, query, parameters=None, retries=3, delay=1):
    """
    Exécute une requête Cypher avec une logique de réessai en cas d'erreur transitoire
    """
    for attempt in range(retries):
        try:
            session.run(query, parameters)
            return
        except TransientError as e:
            if attempt < retries - 1:
                time.sleep(delay)
                continue
            else:
                raise

def process_batch(rows, fieldnames, driver):
    """
    Traite un lot (batch) de lignes du CSV et les insère dans Neo4j.
    C'est la fonction "worker" qui sera exécutée en parallèle.
    """
    normalized_fields = [f.strip().replace(" ", "_").replace("-", "_") for f in fieldnames]
    Row = namedtuple("Row", normalized_fields)
    ids_successfully_processed = []

    with driver.session() as session:
        for row in rows:
            obj = Row(**{k: v if v != "" else None for k, v in row.items()})
            data = obj._asdict()

            if not data.get("id") or pd.isna(data.get("id")) or pd.isna(data.get("author")) or str(data.get("id")).strip() == "":
                continue  # Ignore si l'ID est manquant

            base_models = parse_list_field(data.get("base_model"))
            base_model_rels = parse_list_field(data.get("base_model_relation"))
            datasets = parse_list_field(data.get("dataset"))
            orgs_author_model = parse_list_field(data.get("organizations_author_model"))
            orgs_author_dataset = parse_list_field(data.get("organizations_author_dataset"))
            # Insertion du modèle et de son auteur
            if data.get("id") and data.get("author"):
                run_with_retry(session, """
                    MERGE (m:Model {name: $id})
                    SET m.downloads = $downloadsAllTime,
                        m.task = $pipeline_tag,
                        m.createdAt = $createdAt, 
                        m.parameters = $total_parameters_formatted,
                        m.likes = $likes,
                        m.license = $license
                    MERGE (a:Author {name: $author})
                    SET a.type = $author_type,
                        a.followers = $followers_count_author_model
                    WITH a
                    MATCH (m:Model {name: $id})
                    MERGE (a)-[p:POSTED]->(m)
                    SET p.name = "A publié"
                """, data)

                # Lien entre l’auteur et ses organisations
                orgs_data = [
                    {"org": org, "author": data["author"]}
                    for org in orgs_author_model
                    if pd.notna(org) and data.get("author")
                ]
                if orgs_data:
                    run_with_retry(session, """
                        UNWIND $orgs_data AS row
                        MERGE (o:Author {name: row.org})
                        WITH o, row
                        MATCH (a:Author {name: row.author})
                        MERGE (a)-[r:IS_IN]->(o)
                        SET r.name = "Fait partie de cette organisation", a.type = "personne",o.type = "organisation"
                    """, {"orgs_data": orgs_data})

                # Lien entre modèles et base models
                
                if len(base_models) == len(base_model_rels)  :
                    base_model_data = [
                    {"bm": bm, "id": data["id"], "rel": rel}
                    for bm, rel in zip(base_models, base_model_rels)
                    if pd.notna(bm) and data.get("id")
                ]
                elif len(base_models) >len(base_model_rels) :
                    if base_model_rels==['merge'] : 
                        base_model_data = [
                            {"bm": bm, "id": data["id"], "rel": "merge"}
                            for bm in base_models
                            if pd.notna(bm) and data.get("id")
                        ]
                    else : 
                        base_model_data = [
                    {"bm": bm, "id": data["id"], "rel": "A généré"}
                    for bm in base_models
                    if pd.notna(bm) and data.get("id")
                ]

                if base_model_data:
                    run_with_retry(session, """
                        UNWIND $base_model_data AS row
                        MERGE (bm:Model {name: row.bm})
                        WITH bm, row
                        MATCH (m:Model {name: row.id})
                        MERGE (bm)-[r:USED_IN]->(m)
                        SET r.name = row.rel
                    """, {"base_model_data": base_model_data})

                # Lien entre modèles et datasets
                datasets_data = [
                    {"ds": ds, "downloads": data.get("downloads_dataset"),
                    "createdAt_dataset": data.get("createdAt_dataset"), "id": data["id"]}
                    for ds in datasets
                    if pd.notna(ds) and data.get("id")
                ]
                if datasets_data and data.get("author_dataset") and data.get("dataset") and pd.notna(data.get("author_dataset")):
                    run_with_retry(session, """
                        UNWIND $datasets_data AS row
                        MERGE (d:Dataset {name: row.ds})
                        SET d.downloads = row.downloads,
                            d.createdAt_dataset = row.createdAt_dataset
                        WITH d, row
                        MATCH (m:Model {name: row.id})
                        MERGE (d)-[r:USED_IN]->(m)
                        SET r.name = "A été utilisé dans ce modèle"
                    """, {"datasets_data": datasets_data})

                # Insertion de l’auteur du dataset
                    run_with_retry(session, """
                        MERGE (ad:Author {name: $author_dataset})
                        SET ad.type = $author_dataset_type,
                            ad.followers = $followers_count_author_dataset
                        WITH ad
                        MATCH (d:Dataset {name: $dataset})
                        MERGE (ad)-[r:POSTED]->(d)
                        SET r.name = "A publié"
                    """, data)

                # Lien entre l’auteur du dataset et ses organisations
                orgs_dataset_data = [
                    {"org": org, "author_dataset": data["author_dataset"]}
                    for org in orgs_author_dataset
                    if pd.notna(org) and data.get("author_dataset")
                ]
                if orgs_dataset_data and pd.notna(data.get("author_dataset")):
                    run_with_retry(session, """
                        UNWIND $orgs_data AS row
                        MERGE (o:Author {name: row.org})
                        WITH o, row
                        MATCH (ad:Author {name: row.author_dataset})
                        MERGE (ad)-[r:IS_IN]->(o)
                        SET r.name = "Fait partie de cette organisation",  ad.type = "personne",o.type = "organisation"
                    """, {"orgs_data": orgs_dataset_data})

            ids_successfully_processed.append(data["id"])

        if ids_successfully_processed:
            append_processed_ids(ids_successfully_processed)

# Insère les données depuis un CSV en parallèle, par lots
def insert_parallel(csv_file_path, driver, processed_ids):
    # Lecture et nettoyage via pandas
    df = pd.read_csv(csv_file_path)
    df = df.loc[:, ~df.columns.str.contains('^Unnamed')]
     # Supprimer les lignes où 'id' est NaN ou vide
    df = df[~df["id"].isnull()]
    df = df[df["id"].astype(str).str.strip() != ""]

    # Ne conserver que les lignes dont l'ID n’a pas encore été traitée
    df = df[~df["id"].isin(processed_ids)]

    records = df.to_dict(orient="records")
    fieldnames = list(df.columns)

    batch = []
    futures = []

    with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
        for row in tqdm(records, desc="Lecture CSV"):
            batch.append(row)
            if len(batch) == BATCH_SIZE:
                futures.append(executor.submit(process_batch, batch.copy(), fieldnames, driver))
                batch = []

        if batch:
            futures.append(executor.submit(process_batch, batch.copy(), fieldnames, driver))

        for future in tqdm(futures, desc="Traitement parallélisé"):
            future.result()