Almaatla commited on
Commit
7506aab
·
verified ·
1 Parent(s): 7f043e9

Upload classes.py

Browse files
Files changed (1) hide show
  1. classes.py +76 -26
classes.py CHANGED
@@ -196,17 +196,24 @@ class TDocIndexer:
196
  self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
197
 
198
  # Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle
199
- with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
200
- futures = [executor.submit(self.process_meeting, meeting, wg_url)
201
- for meeting in meeting_folders if meeting not in ['./', '../']]
202
-
203
- total = len(futures)
204
- done_count = 0
205
- yield f"event: get-maximum\ndata: {total}\n\n"
206
 
 
207
  for future in concurrent.futures.as_completed(futures):
208
  done_count += 1
209
  yield f"event: progress\ndata: {done_count}\n\n"
 
 
 
 
 
 
210
 
211
  def index_all_tdocs(self):
212
  """Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading"""
@@ -257,17 +264,24 @@ class TDocIndexer:
257
  self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
258
 
259
  # Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle
260
- with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
261
- futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True)
262
- for meeting in meeting_folders if meeting not in ['./', '../']]
263
- total = len(futures)
264
- done_count = 0
265
 
266
- yield f"event: get-maximum\ndata: {total}\n\n"
267
 
 
268
  for future in concurrent.futures.as_completed(futures):
269
  done_count += 1
270
  yield f"event: progress\ndata: {done_count}\n\n"
 
 
 
 
 
 
271
 
272
  docs_count_after = len(self.indexer)
273
  new_docs_count = docs_count_after - docs_count_before
@@ -287,6 +301,14 @@ class Spec3GPPIndexer:
287
  self.processed_count = 0
288
  self.total_count = 0
289
 
 
 
 
 
 
 
 
 
290
  self.DICT_LOCK = threading.Lock()
291
  self.DOCUMENT_LOCK = threading.Lock()
292
  self.STOP_EVENT = threading.Event()
@@ -369,7 +391,7 @@ class Spec3GPPIndexer:
369
  '--outdir', tmpdir,
370
  extracted_path
371
  ]
372
- subprocess.run(cmd, check=True, timeout=60*5, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
373
 
374
  txt_file = os.path.splitext(extracted_path)[0] + '.txt'
375
  if os.path.exists(txt_file):
@@ -382,6 +404,10 @@ class Spec3GPPIndexer:
382
 
383
  except Exception as e:
384
  print(f"Error getting text for {specification} v{version_code}: {e}")
 
 
 
 
385
  return []
386
 
387
  def get_spec_content(self, specification, version_code):
@@ -445,6 +471,12 @@ class Spec3GPPIndexer:
445
  self.processed_count += 1
446
  return
447
 
 
 
 
 
 
 
448
  document = None
449
  already_indexed = False
450
  needs_fetch = False
@@ -562,17 +594,24 @@ class Spec3GPPIndexer:
562
  specifications = self.fetch_spec_table()
563
  self.total_count = len(specifications)
564
  print(f"Processing {self.total_count} specs with {self.max_workers} threads...")
565
- with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
566
- futures = [executor.submit(self.process_specification, spec) for spec in specifications]
567
- total = len(futures)
568
- done_count = 0
569
- yield f"event: get-maximum\ndata: {total}\n\n"
570
 
 
571
  for future in concurrent.futures.as_completed(futures):
572
  done_count += 1
573
  yield f"event: progress\ndata: {done_count}\n\n"
574
  if self.STOP_EVENT.is_set():
575
  break
 
 
 
 
 
 
576
  print("All specs processed.")
577
 
578
  # Sauvegarde (identique au script original)
@@ -591,7 +630,12 @@ class Spec3GPPIndexer:
591
  print("Pushing ...")
592
  push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF"])
593
  push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF"])
594
-
 
 
 
 
 
595
  self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list()
596
  self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
597
  print("Save finished.")
@@ -830,18 +874,24 @@ class SpecETSIIndexer:
830
  self.total_count = len(specifications)
831
  print(f"Traitement de {self.total_count} specs avec {self.max_workers} threads...\n")
832
 
833
- with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
834
- futures = [executor.submit(self.process_specification, spec) for spec in specifications]
835
- total = len(futures)
836
- done_count = 0
837
- yield f"event: get-maximum\ndata: {total}\n\n"
838
 
 
839
  for future in concurrent.futures.as_completed(futures):
840
  done_count += 1
841
  yield f"event: progress\ndata: {done_count}\n\n"
842
  if self.STOP_EVENT.is_set():
843
  break
844
-
 
 
 
 
 
845
  print(f"\nAll {self.processed_count}/{self.total_count} specs processed.")
846
 
847
  def save(self):
 
196
  self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
197
 
198
  # Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle
199
+ executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
200
+ futures = [executor.submit(self.process_meeting, meeting, wg_url)
201
+ for meeting in meeting_folders if meeting not in ['./', '../']]
202
+
203
+ total = len(futures)
204
+ done_count = 0
205
+ yield f"event: get-maximum\ndata: {total}\n\n"
206
 
207
+ try:
208
  for future in concurrent.futures.as_completed(futures):
209
  done_count += 1
210
  yield f"event: progress\ndata: {done_count}\n\n"
211
+ except GeneratorExit:
212
+ for f in futures:
213
+ f.cancel()
214
+ executor.shutdown(wait=False, cancel_futures=True)
215
+ return
216
+ executor.shutdown(wait=False)
217
 
218
  def index_all_tdocs(self):
219
  """Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading"""
 
264
  self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
265
 
266
  # Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle
267
+ executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
268
+ futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True)
269
+ for meeting in meeting_folders if meeting not in ['./', '../']]
270
+ total = len(futures)
271
+ done_count = 0
272
 
273
+ yield f"event: get-maximum\ndata: {total}\n\n"
274
 
275
+ try:
276
  for future in concurrent.futures.as_completed(futures):
277
  done_count += 1
278
  yield f"event: progress\ndata: {done_count}\n\n"
279
+ except GeneratorExit:
280
+ for f in futures:
281
+ f.cancel()
282
+ executor.shutdown(wait=False, cancel_futures=True)
283
+ return
284
+ executor.shutdown(wait=False)
285
 
286
  docs_count_after = len(self.indexer)
287
  new_docs_count = docs_count_after - docs_count_before
 
301
  self.processed_count = 0
302
  self.total_count = 0
303
 
304
+ try:
305
+ self.failed_specifications = set(
306
+ item["spec_id"] for item in load_dataset("OrganizedProgrammers/3GPPFailedSpecs")["train"].to_list()
307
+ )
308
+ print(f"Loaded {len(self.failed_specifications)} previously failed specifications")
309
+ except (EmptyDatasetError, Exception):
310
+ self.failed_specifications = set()
311
+
312
  self.DICT_LOCK = threading.Lock()
313
  self.DOCUMENT_LOCK = threading.Lock()
314
  self.STOP_EVENT = threading.Event()
 
391
  '--outdir', tmpdir,
392
  extracted_path
393
  ]
394
+ subprocess.run(cmd, check=True, timeout=60*2, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
395
 
396
  txt_file = os.path.splitext(extracted_path)[0] + '.txt'
397
  if os.path.exists(txt_file):
 
404
 
405
  except Exception as e:
406
  print(f"Error getting text for {specification} v{version_code}: {e}")
407
+ if isinstance(e, (subprocess.TimeoutExpired, subprocess.CalledProcessError)):
408
+ with self.DICT_LOCK:
409
+ self.failed_specifications.add(specification)
410
+ print(f"Spec {specification}: marked as failed for future indexation runs")
411
  return []
412
 
413
  def get_spec_content(self, specification, version_code):
 
471
  self.processed_count += 1
472
  return
473
 
474
+ if doc_id in self.failed_specifications:
475
+ with self.DICT_LOCK:
476
+ self.processed_count += 1
477
+ print(f"Spec {doc_id} ({spec.get('title', '')}): skipped (previously failed) - Progress {self.processed_count}/{self.total_count}")
478
+ return
479
+
480
  document = None
481
  already_indexed = False
482
  needs_fetch = False
 
594
  specifications = self.fetch_spec_table()
595
  self.total_count = len(specifications)
596
  print(f"Processing {self.total_count} specs with {self.max_workers} threads...")
597
+ executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
598
+ futures = [executor.submit(self.process_specification, spec) for spec in specifications]
599
+ total = len(futures)
600
+ done_count = 0
601
+ yield f"event: get-maximum\ndata: {total}\n\n"
602
 
603
+ try:
604
  for future in concurrent.futures.as_completed(futures):
605
  done_count += 1
606
  yield f"event: progress\ndata: {done_count}\n\n"
607
  if self.STOP_EVENT.is_set():
608
  break
609
+ except GeneratorExit:
610
+ for f in futures:
611
+ f.cancel()
612
+ executor.shutdown(wait=False, cancel_futures=True)
613
+ return
614
+ executor.shutdown(wait=False)
615
  print("All specs processed.")
616
 
617
  # Sauvegarde (identique au script original)
 
630
  print("Pushing ...")
631
  push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF"])
632
  push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF"])
633
+
634
+ if self.failed_specifications:
635
+ failed_list = [{"spec_id": spec_id} for spec_id in sorted(self.failed_specifications)]
636
+ Dataset.from_list(failed_list).push_to_hub("OrganizedProgrammers/3GPPFailedSpecs", token=os.environ["HF"])
637
+ print(f"Saved {len(failed_list)} failed specifications to 3GPPFailedSpecs")
638
+
639
  self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list()
640
  self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
641
  print("Save finished.")
 
874
  self.total_count = len(specifications)
875
  print(f"Traitement de {self.total_count} specs avec {self.max_workers} threads...\n")
876
 
877
+ executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
878
+ futures = [executor.submit(self.process_specification, spec) for spec in specifications]
879
+ total = len(futures)
880
+ done_count = 0
881
+ yield f"event: get-maximum\ndata: {total}\n\n"
882
 
883
+ try:
884
  for future in concurrent.futures.as_completed(futures):
885
  done_count += 1
886
  yield f"event: progress\ndata: {done_count}\n\n"
887
  if self.STOP_EVENT.is_set():
888
  break
889
+ except GeneratorExit:
890
+ for f in futures:
891
+ f.cancel()
892
+ executor.shutdown(wait=False, cancel_futures=True)
893
+ return
894
+ executor.shutdown(wait=False)
895
  print(f"\nAll {self.processed_count}/{self.total_count} specs processed.")
896
 
897
  def save(self):