| | import datetime |
| | import time |
| | import sys |
| | import json |
| | import traceback |
| | import requests |
| | import zipfile |
| | import uuid |
| | import os |
| | import io |
| | import re |
| | import subprocess |
| | import concurrent.futures |
| | import threading |
| | from io import StringIO, BytesIO |
| | from typing import List, Dict, Any |
| |
|
| | import pandas as pd |
| | import numpy as np |
| | import warnings |
| |
|
| | warnings.filterwarnings("ignore") |
| |
|
| | |
| | chars = "0123456789abcdefghijklmnopqrstuvwxyz" |
| |
|
| | |
| | print_lock = threading.Lock() |
| | dict_lock = threading.Lock() |
| | scope_lock = threading.Lock() |
| |
|
| | |
| | indexed_specifications = {} |
| | documents_by_spec_num = {} |
| | processed_count = 0 |
| | total_count = 0 |
| |
|
| | regex = r"^(\d+[a-z]?(?:\.\d+)*)\t[\ \S]+$" |
| |
|
| | def get_text(specification: str, version: str): |
| | """Récupère les bytes du PDF à partir d'une spécification et d'une version.""" |
| | doc_id = specification |
| | series = doc_id.split(".")[0] |
| | |
| | response = requests.get( |
| | f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", |
| | verify=False, |
| | headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} |
| | ) |
| | |
| | if response.status_code != 200: |
| | raise Exception(f"Téléchargement du ZIP échoué pour {specification}-{version}") |
| | |
| | zip_bytes = io.BytesIO(response.content) |
| | |
| | with zipfile.ZipFile(zip_bytes) as zf: |
| | for file_name in zf.namelist(): |
| | if file_name.endswith("zip"): |
| | print("Another ZIP !") |
| | zip_bytes = io.BytesIO(zf.read(file_name)) |
| | zf = zipfile.ZipFile(zip_bytes) |
| | for file_name2 in zf.namelist(): |
| | if file_name2.endswith("doc") or file_name2.endswith("docx"): |
| | if "cover" in file_name2.lower(): |
| | print("COVER !") |
| | continue |
| | ext = file_name2.split(".")[-1] |
| | doc_bytes = zf.read(file_name2) |
| | temp_id = str(uuid.uuid4()) |
| | input_path = f"/tmp/{temp_id}.{ext}" |
| | output_path = f"/tmp/{temp_id}.txt" |
| | |
| | with open(input_path, "wb") as f: |
| | f.write(doc_bytes) |
| | |
| | subprocess.run([ |
| | "libreoffice", |
| | "--headless", |
| | "--convert-to", "txt", |
| | "--outdir", "/tmp", |
| | input_path |
| | ], check=True) |
| | |
| | with open(output_path, "r") as f: |
| | txt_data = [line.strip() for line in f if line.strip()] |
| | |
| | os.remove(input_path) |
| | os.remove(output_path) |
| | return txt_data |
| | elif file_name.endswith("doc") or file_name.endswith("docx"): |
| | if "cover" in file_name.lower(): |
| | print("COVER !") |
| | continue |
| | ext = file_name.split(".")[-1] |
| | doc_bytes = zf.read(file_name) |
| | temp_id = str(uuid.uuid4()) |
| | input_path = f"/tmp/{temp_id}.{ext}" |
| | output_path = f"/tmp/{temp_id}.txt" |
| | |
| | print("Ecriture") |
| | with open(input_path, "wb") as f: |
| | f.write(doc_bytes) |
| |
|
| | print("Convertissement") |
| | subprocess.run([ |
| | "libreoffice", |
| | "--headless", |
| | "--convert-to", "txt", |
| | "--outdir", "/tmp", |
| | input_path |
| | ], check=True) |
| | |
| | print("Ecriture TXT") |
| | with open(output_path, "r", encoding="utf-8") as f: |
| | txt_data = [line.strip() for line in f if line.strip()] |
| | |
| | os.remove(input_path) |
| | os.remove(output_path) |
| | return txt_data |
| | |
| | raise Exception(f"Aucun fichier .doc/.docx trouvé dans le ZIP pour {specification}-{version}") |
| |
|
| | def get_spec_content(specification: str, version: str): |
| | text = get_text(specification, version) |
| | forewords = [] |
| | for x in range(len(text)): |
| | line = text[x] |
| | if "Foreword" in line: |
| | forewords.append(x) |
| | if len(forewords) >= 2: |
| | break |
| |
|
| | toc_brut = text[forewords[1]:] |
| | chapters = [] |
| | for line in toc_brut: |
| | x = line.split("\t") |
| | m = re.search(regex, line) |
| | if m and any(line in c for c in text[forewords[0]:forewords[1]]): |
| | chapters.append(line) |
| | print(line) |
| |
|
| | real_toc_indexes = {} |
| |
|
| | for chapter in chapters: |
| | x = text.index(chapter) |
| | real_toc_indexes[chapter] = x |
| |
|
| | document = {} |
| | toc = list(real_toc_indexes.keys()) |
| | index_toc = list(real_toc_indexes.values()) |
| | curr_index = 0 |
| | for x in range(1, len(toc)): |
| | document[toc[curr_index].replace("\t", " ")] = re.sub(r"[\ \t]+", " ", "\n".join(text[index_toc[curr_index]+1:index_toc[x]])) |
| | curr_index = x |
| |
|
| | document[toc[curr_index].replace("\t", " ")] = re.sub(r"\s+", " ", " ".join(text[index_toc[curr_index]+1:])) |
| | print(len(toc)-1, toc[curr_index], curr_index) |
| | return document |
| |
|
| | def process_specification(spec: Dict[str, Any], columns: List[str]) -> None: |
| | """Traite une spécification individuelle avec multithreading.""" |
| | global processed_count, indexed_specifications, documents_by_spec_num |
| | |
| | try: |
| | if spec.get('vers', None) is None: |
| | return |
| | |
| | doc_id = str(spec["spec_num"]) |
| | series = doc_id.split(".")[0] |
| | |
| | a, b, c = str(spec["vers"]).split(".") |
| | |
| | |
| | if not (int(a) > 35 or int(b) > 35 or int(c) > 35): |
| | version_code = f"{chars[int(a)]}{chars[int(b)]}{chars[int(c)]}" |
| | spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" |
| | else: |
| | x, y, z = str(a), str(b), str(c) |
| | while len(x) < 2: |
| | x = "0" + x |
| | while len(y) < 2: |
| | y = "0" + y |
| | while len(z) < 2: |
| | z = "0" + z |
| | version_code = f"{x}{y}{z}" |
| | spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" |
| | |
| | string = f"{spec['spec_num']}+-+{spec['title']}+-+{spec['type']}+-+{spec['vers']}+-+{spec['WG']}+-+Rel-{spec['vers'].split('.')[0]}" |
| | |
| | metadata = { |
| | "id": str(spec["spec_num"]), |
| | "title": spec["title"], |
| | "type": spec["type"], |
| | "release": str(spec["vers"].split(".")[0]), |
| | "version": str(spec["vers"]), |
| | "working_group": spec["WG"], |
| | "url": spec_url |
| | } |
| | |
| | |
| | with dict_lock: |
| | indexed_specifications[string] = metadata |
| | processed_count += 1 |
| | |
| | |
| | with print_lock: |
| | sys.stdout.write(f"\rTraitement: {processed_count}/{total_count} spécifications") |
| | sys.stdout.flush() |
| | |
| | except Exception as e: |
| | with print_lock: |
| | print(f"\nErreur lors du traitement de {spec.get('spec_num', 'inconnu')}: {str(e)}") |
| |
|
| | def main(): |
| | global total_count |
| | start_time = time.time() |
| | |
| | |
| | print("Récupération des spécifications depuis 3GPP...") |
| | response = requests.get( |
| | f'https://www.3gpp.org/dynareport?code=status-report.htm', |
| | headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, |
| | verify=False |
| | ) |
| | |
| | |
| | dfs = pd.read_html( |
| | StringIO(response.text), |
| | storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, |
| | encoding="utf-8" |
| | ) |
| | |
| | for x in range(len(dfs)): |
| | dfs[x] = dfs[x].replace({np.nan: None}) |
| | |
| | |
| | columns_needed = [0, 1, 2, 3, 4] |
| | extracted_dfs = [df.iloc[:, columns_needed] for df in dfs] |
| | columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] |
| | |
| | |
| | specifications = [] |
| | for df in extracted_dfs: |
| | for index, row in df.iterrows(): |
| | doc = row.to_list() |
| | doc_dict = dict(zip(columns, doc)) |
| | specifications.append(doc_dict) |
| | |
| | total_count = len(specifications) |
| | print(f"Traitement de {total_count} spécifications avec multithreading...") |
| | |
| | try: |
| | |
| | if os.path.exists("indexed_docs_content.zip"): |
| | with zipfile.ZipFile(open("indexed_docs_content.zip", "rb")) as zf: |
| | for file_name in zf.namelist(): |
| | if file_name.endswith(".json"): |
| | doc_bytes = zf.read(file_name) |
| | global documents_by_spec_num |
| | documents_by_spec_num = json.loads(doc_bytes.decode("utf-8")) |
| | print(f"Chargement de {len(documents_by_spec_num)} documents depuis le cache.") |
| | |
| | |
| | with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: |
| | futures = [executor.submit(process_specification, spec, columns) for spec in specifications] |
| | concurrent.futures.wait(futures) |
| | |
| | finally: |
| | json_str = json.dumps(documents_by_spec_num, indent=4, ensure_ascii=False) |
| | json_bytes = json_str.encode("utf-8") |
| | with zipfile.ZipFile("indexed_docs_content.zip", "w", compression=zipfile.ZIP_DEFLATED) as archive: |
| | archive.writestr("indexed_documents.json", json_bytes) |
| | elapsed_time = time.time() - start_time |
| | print(f"\nTraitement terminé en {elapsed_time:.2f} secondes") |
| | print(f"Résultats sauvegardés dans l'archive ZIP") |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|