| | import shutil |
| | import bm25s |
| | from bm25s.hf import BM25HF |
| | import threading, re, time, concurrent.futures, requests, os, hashlib, traceback, io, zipfile, subprocess, tempfile, json, fitz |
| | import pandas as pd |
| | import numpy as np |
| |
|
| | from bs4 import BeautifulSoup |
| | from datasets import load_dataset, Dataset |
| | from datasets.data_files import EmptyDatasetError |
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv() |
| |
|
| | class TDocIndexer: |
| | def __init__(self, max_workers=33): |
| | self.indexer_length = 0 |
| | self.dataset = "OrganizedProgrammers/3GPPTDocLocation" |
| |
|
| | self.indexer = self.load_indexer() |
| | self.main_ftp_url = "https://3gpp.org/ftp" |
| | self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE) |
| | self.max_workers = max_workers |
| | |
| | self.print_lock = threading.Lock() |
| | self.indexer_lock = threading.Lock() |
| | |
| | self.total_indexed = 0 |
| | self.processed_count = 0 |
| | self.total_count = 0 |
| |
|
| | def load_indexer(self): |
| | self.indexer_length = 0 |
| | all_docs = {} |
| | tdoc_locations = load_dataset(self.dataset) |
| | tdoc_locations = tdoc_locations["train"].to_list() |
| | for doc in tdoc_locations: |
| | self.indexer_length += 1 |
| | all_docs[doc["doc_id"]] = doc["url"] |
| |
|
| | return all_docs |
| | |
| | def save_indexer(self): |
| | """Save the updated index""" |
| | data = [] |
| | for doc_id, url in self.indexer.items(): |
| | data.append({"doc_id": doc_id, "url": url}) |
| | |
| | dataset = Dataset.from_list(data) |
| | dataset.push_to_hub(self.dataset, token=os.environ["HF"]) |
| | self.indexer = self.load_indexer() |
| |
|
| | def get_docs_from_url(self, url): |
| | try: |
| | response = requests.get(url, verify=False, timeout=10) |
| | soup = BeautifulSoup(response.text, "html.parser") |
| | return [item.get_text() for item in soup.select("tr td a")] |
| | except Exception as e: |
| | with self.print_lock: |
| | print(f"Erreur lors de l'accès à {url}: {e}") |
| | return [] |
| |
|
| | def is_valid_document_pattern(self, filename): |
| | return bool(self.valid_doc_pattern.match(filename)) |
| |
|
| | def is_zip_file(self, filename): |
| | return filename.lower().endswith('.zip') |
| |
|
| | def extract_doc_id(self, filename): |
| | if self.is_valid_document_pattern(filename): |
| | match = self.valid_doc_pattern.match(filename) |
| | if match: |
| | |
| | full_id = filename.split('.')[0] |
| | return full_id.split('_')[0] |
| | return None |
| |
|
| | def process_zip_files(self, files_list, base_url, workshop=False): |
| | """Traiter une liste de fichiers pour trouver et indexer les ZIP valides""" |
| | indexed_count = 0 |
| | |
| | for file in files_list: |
| | if file in ['./', '../', 'ZIP/', 'zip/']: |
| | continue |
| | |
| | |
| | if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop): |
| | file_url = f"{base_url}/{file}" |
| | |
| | |
| | doc_id = self.extract_doc_id(file) |
| | if doc_id is None: |
| | doc_id = file.split('.')[0] |
| | if doc_id: |
| | |
| | with self.indexer_lock: |
| | if doc_id in self.indexer and self.indexer[doc_id] == file_url: |
| | continue |
| | |
| | |
| | self.indexer[doc_id] = file_url |
| | indexed_count += 1 |
| | self.total_indexed += 1 |
| | |
| | return indexed_count |
| |
|
| | def process_meeting(self, meeting, wg_url, workshop=False): |
| | """Traiter une réunion individuelle avec multithreading""" |
| | try: |
| | if meeting in ['./', '../']: |
| | return 0 |
| | |
| | meeting_url = f"{wg_url}/{meeting}" |
| | |
| | with self.print_lock: |
| | print(f"Vérification du meeting: {meeting}") |
| | |
| | |
| | meeting_contents = self.get_docs_from_url(meeting_url) |
| | |
| | key = None |
| | if "docs" in [x.lower() for x in meeting_contents]: |
| | key = "docs" |
| | elif "tdocs" in [x.lower() for x in meeting_contents]: |
| | key = "tdocs" |
| | elif "tdoc" in [x.lower() for x in meeting_contents]: |
| | key = "tdoc" |
| | |
| | if key is not None: |
| | docs_url = f"{meeting_url}/{key}" |
| | |
| | with self.print_lock: |
| | print(f"Vérification des documents présent dans {docs_url}") |
| | |
| | |
| | docs_files = self.get_docs_from_url(docs_url) |
| | |
| | |
| | docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop) |
| | |
| | if docs_indexed_count > 0: |
| | with self.print_lock: |
| | print(f"{docs_indexed_count} fichiers trouvés") |
| | |
| | |
| | if "zip" in [x.lower() for x in docs_files]: |
| | zip_url = f"{docs_url}/zip" |
| | |
| | with self.print_lock: |
| | print(f"Vérification du dossier ./zip: {zip_url}") |
| | |
| | |
| | zip_files = self.get_docs_from_url(zip_url) |
| | |
| | |
| | zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop) |
| | |
| | if zip_indexed_count > 0: |
| | with self.print_lock: |
| | print(f"{zip_indexed_count} fichiers trouvés") |
| | |
| | |
| | with self.indexer_lock: |
| | self.processed_count += 1 |
| | |
| | |
| | with self.print_lock: |
| | progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0 |
| | print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)") |
| | |
| | return 1 |
| | |
| | except Exception as e: |
| | with self.print_lock: |
| | print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}") |
| | return 0 |
| |
|
| | def process_workgroup(self, wg, main_url): |
| | """Traiter un groupe de travail avec multithreading pour ses réunions""" |
| | if wg in ['./', '../']: |
| | return |
| | |
| | wg_url = f"{main_url}/{wg}" |
| | |
| | with self.print_lock: |
| | print(f"Vérification du working group: {wg}") |
| | |
| | |
| | meeting_folders = self.get_docs_from_url(wg_url) |
| | |
| | |
| | self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) |
| | |
| | |
| | with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
| | futures = [executor.submit(self.process_meeting, meeting, wg_url) |
| | for meeting in meeting_folders if meeting not in ['./', '../']] |
| | |
| | total = len(futures) |
| | done_count = 0 |
| | yield f"event: get-maximum\ndata: {total}\n\n" |
| |
|
| | for future in concurrent.futures.as_completed(futures): |
| | done_count += 1 |
| | yield f"event: progress\ndata: {done_count}\n\n" |
| |
|
| | def index_all_tdocs(self): |
| | """Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading""" |
| | print("Démarrage de l'indexation des TDocs 3GPP complète") |
| | |
| | start_time = time.time() |
| | docs_count_before = self.indexer_length |
| | |
| | |
| | main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"] |
| | |
| | for main_tsg in main_groups: |
| | print(f"Indexation de {main_tsg.upper()}...") |
| | |
| | main_url = f"{self.main_ftp_url}/{main_tsg}" |
| | |
| | |
| | workgroups = self.get_docs_from_url(main_url) |
| | |
| | |
| | |
| | for wg in workgroups: |
| | yield f"event: info\ndata: {main_tsg}-{wg}\n\n" |
| | for content in self.process_workgroup(wg, main_url): |
| | yield content |
| | |
| | docs_count_after = len(self.indexer) |
| | new_docs_count = abs(docs_count_after - docs_count_before) |
| | |
| | print(f"Indexation terminée en {time.time() - start_time:.2f} secondes") |
| | print(f"Nouveaux documents ZIP indexés: {new_docs_count}") |
| | print(f"Total des documents dans l'index: {docs_count_after}") |
| | |
| | return self.indexer |
| |
|
| | def index_all_workshops(self): |
| | print("Démarrage de l'indexation des workshops ZIP 3GPP...") |
| | start_time = time.time() |
| | docs_count_before = len(self.indexer) |
| |
|
| | print("\nIndexation du dossier 'workshop'") |
| | main_url = f"{self.main_ftp_url}/workshop" |
| | |
| | |
| | meeting_folders = self.get_docs_from_url(main_url) |
| | |
| | |
| | self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) |
| | |
| | |
| | with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
| | futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True) |
| | for meeting in meeting_folders if meeting not in ['./', '../']] |
| | total = len(futures) |
| | done_count = 0 |
| |
|
| | yield f"event: get-maximum\ndata: {total}\n\n" |
| |
|
| | for future in concurrent.futures.as_completed(futures): |
| | done_count += 1 |
| | yield f"event: progress\ndata: {done_count}\n\n" |
| | |
| | docs_count_after = len(self.indexer) |
| | new_docs_count = docs_count_after - docs_count_before |
| | |
| | print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes") |
| | print(f"Nouveaux documents ZIP indexés: {new_docs_count}") |
| | print(f"Total des documents dans l'index: {docs_count_after}") |
| | |
| | return self.indexer |
| |
|
| | class Spec3GPPIndexer: |
| | def __init__(self, max_workers=16): |
| | self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list() |
| | self.documents_by_spec_num = self._make_doc_index(self.spec_contents) |
| | self.indexed_specifications = {} |
| | self.specifications_passed = set() |
| | self.processed_count = 0 |
| | self.total_count = 0 |
| |
|
| | self.DICT_LOCK = threading.Lock() |
| | self.DOCUMENT_LOCK = threading.Lock() |
| | self.STOP_EVENT = threading.Event() |
| | self.max_workers = max_workers |
| | self.LIBREOFFICE_SEMAPHORE = threading.Semaphore(self.max_workers) |
| |
|
| | def _make_doc_index(self, specs): |
| | doc_index = {} |
| | for section in specs: |
| | if section["doc_id"] not in doc_index: |
| | doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} |
| | else: |
| | doc_index[section["doc_id"]]["content"][section["section"]] = section["content"] |
| | return doc_index |
| |
|
| | @staticmethod |
| | def version_to_code(version_str): |
| | chars = "0123456789abcdefghijklmnopqrstuvwxyz" |
| | parts = version_str.split('.') |
| | if len(parts) != 3: |
| | return None |
| | try: |
| | x, y, z = [int(p) for p in parts] |
| | except ValueError: |
| | return None |
| | if x < 36 and y < 36 and z < 36: |
| | return f"{chars[x]}{chars[y]}{chars[z]}" |
| | else: |
| | return f"{str(x).zfill(2)}{str(y).zfill(2)}{str(z).zfill(2)}" |
| |
|
| | @staticmethod |
| | def hasher(specification, version_code): |
| | return hashlib.md5(f"{specification}{version_code}".encode()).hexdigest() |
| |
|
| | @staticmethod |
| | def get_scope(content): |
| | for title, text in content.items(): |
| | if title.lower().endswith("scope"): |
| | return text |
| | return "" |
| |
|
| | def get_text(self, specification, version_code): |
| | if self.STOP_EVENT.is_set(): |
| | return [] |
| |
|
| | doc_id = specification |
| | series = doc_id.split(".")[0] |
| | url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" |
| |
|
| | try: |
| | response = requests.get(url, verify=False) |
| | if response.status_code != 200: |
| | return [] |
| |
|
| | zip_bytes = io.BytesIO(response.content) |
| | with zipfile.ZipFile(zip_bytes) as zip_file: |
| | |
| | docx_files = [f for f in zip_file.namelist() if f.lower().endswith(('.doc', '.docx'))] |
| | if not docx_files: |
| | return [] |
| |
|
| | full_text = [] |
| |
|
| | for doc_file in docx_files: |
| | with tempfile.TemporaryDirectory() as tmpdir: |
| | extracted_path = os.path.join(tmpdir, os.path.basename(doc_file)) |
| | with open(extracted_path, 'wb') as f: |
| | f.write(zip_file.read(doc_file)) |
| |
|
| | |
| | profile_dir = tempfile.mkdtemp(prefix="libreoffice_profile_") |
| |
|
| | try: |
| | with self.LIBREOFFICE_SEMAPHORE: |
| | cmd = [ |
| | 'soffice', |
| | '--headless', |
| | f'-env:UserInstallation=file://{profile_dir}', |
| | '--convert-to', 'txt:Text', |
| | '--outdir', tmpdir, |
| | extracted_path |
| | ] |
| | subprocess.run(cmd, check=True, timeout=60*5, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| |
|
| | txt_file = os.path.splitext(extracted_path)[0] + '.txt' |
| | if os.path.exists(txt_file): |
| | with open(txt_file, 'r', encoding='utf-8', errors='ignore') as ftxt: |
| | full_text.extend(ftxt.readlines()) |
| | finally: |
| | shutil.rmtree(profile_dir, ignore_errors=True) |
| |
|
| | return full_text |
| |
|
| | except Exception as e: |
| | print(f"Error getting text for {specification} v{version_code}: {e}") |
| | return [] |
| |
|
| | def get_spec_content(self, specification, version_code): |
| | if self.STOP_EVENT.is_set(): |
| | return {} |
| |
|
| | text = self.get_text(specification, version_code) |
| | if not text: |
| | return {} |
| |
|
| | chapters = [] |
| | chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+[^\.]$") |
| | for i, line in enumerate(text): |
| | if chapter_regex.fullmatch(line): |
| | chapters.append((i, line)) |
| |
|
| | document = {} |
| | for i in range(len(chapters)): |
| | start_index, chapter_title = chapters[i] |
| | end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text) |
| | content_lines = text[start_index + 1:end_index] |
| | document[chapter_title.replace("\t", " ")] = "\n".join(content_lines) |
| |
|
| | return document |
| |
|
| | def fetch_spec_table(self): |
| | response = requests.get( |
| | 'https://www.3gpp.org/dynareport?code=status-report.htm', |
| | headers={"User-Agent": 'Mozilla/5.0'}, |
| | verify=False |
| | ) |
| | dfs = pd.read_html(io.StringIO(response.text)) |
| | for x in range(len(dfs)): |
| | dfs[x] = dfs[x].replace({np.nan: None}) |
| | columns_needed = [0, 1, 2, 3, 4] |
| | extracted_dfs = [df.iloc[:, columns_needed] for df in dfs] |
| | columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] |
| | specifications = [] |
| | for df in extracted_dfs: |
| | for index, row in df.iterrows(): |
| | doc = row.to_list() |
| | doc_dict = dict(zip(columns, doc)) |
| | specifications.append(doc_dict) |
| | return specifications |
| |
|
| | def process_specification(self, spec): |
| | if self.STOP_EVENT.is_set(): |
| | return |
| | try: |
| | doc_id = str(spec['spec_num']) |
| | version_code = self.version_to_code(str(spec['vers'])) |
| | if not version_code: |
| | with self.DICT_LOCK: |
| | self.processed_count += 1 |
| | return |
| |
|
| | document = None |
| | already_indexed = False |
| | with self.DOCUMENT_LOCK: |
| | doc_in_cache = doc_id in self.documents_by_spec_num and \ |
| | self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version_code) |
| |
|
| | if doc_in_cache and doc_id not in self.specifications_passed: |
| | document = self.documents_by_spec_num[doc_id] |
| | self.specifications_passed.add(doc_id) |
| | already_indexed = True |
| | elif doc_id not in self.specifications_passed: |
| | doc_content = self.get_spec_content(doc_id, version_code) |
| | if doc_content: |
| | document = {"content": doc_content, "hash": self.hasher(doc_id, version_code)} |
| | with self.DOCUMENT_LOCK: |
| | self.documents_by_spec_num[doc_id] = document |
| | self.specifications_passed.add(doc_id) |
| | already_indexed = False |
| |
|
| | if document: |
| | url = f"https://www.3gpp.org/ftp/Specs/archive/{doc_id.split('.')[0]}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" |
| | metadata = { |
| | "id": doc_id, |
| | "title": spec.get("title", ""), |
| | "type": spec.get("type", ""), |
| | "version": str(spec.get("vers", "")), |
| | "working_group": spec.get("WG", ""), |
| | "url": url, |
| | "scope": self.get_scope(document["content"]) |
| | } |
| | key = f"{doc_id}+-+{spec.get('title', '')}+-+{spec.get('type', '')}+-+{spec.get('vers', '')}+-+{spec.get('WG', '')}" |
| | with self.DICT_LOCK: |
| | self.indexed_specifications[key] = metadata |
| |
|
| | with self.DICT_LOCK: |
| | self.processed_count += 1 |
| | status = "already indexed" if already_indexed else "indexed now" |
| | print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}") |
| |
|
| | except Exception as e: |
| | traceback.print_exc() |
| | print(f"Error processing spec {spec.get('spec_num')} v{spec.get('vers')}: {e}") |
| | with self.DICT_LOCK: |
| | self.processed_count += 1 |
| | print(f"Progress: {self.processed_count}/{self.total_count} specs processed") |
| | |
| | def get_document(self, spec_id: str, spec_title: str): |
| | text = [f"{spec_id} - {spec_title}\n"] |
| | for section in self.spec_contents: |
| | if spec_id == section["doc_id"]: |
| | text.extend([f"{section['section']}\n\n{section['content']}"]) |
| | return text |
| |
|
| | def create_bm25_index(self): |
| | dataset_metadata = self.indexed_specifications.values() |
| | unique_specs = set() |
| | corpus_json = [] |
| |
|
| | for specification in dataset_metadata: |
| | if specification['id'] in unique_specs: continue |
| | for section in self.spec_contents: |
| | if specification['id'] == section['doc_id']: |
| | corpus_json.append({"text": f"{section['section']}\n{section['content']}", "metadata": { |
| | "id": specification['id'], |
| | "title": specification['title'], |
| | "section_title": section['section'], |
| | "version": specification['version'], |
| | "type": specification['type'], |
| | "working_group": specification['working_group'], |
| | "url": specification['url'], |
| | "scope": specification['scope'] |
| | }}) |
| | |
| | corpus_text = [doc["text"] for doc in corpus_json] |
| | corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") |
| |
|
| | print("Indexing BM25") |
| | retriever = BM25HF(corpus=corpus_json) |
| | retriever.index(corpus_tokens) |
| |
|
| | retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSections", token=os.environ.get("HF")) |
| |
|
| | unique_specs = set() |
| | corpus_json = [] |
| |
|
| | for specification in dataset_metadata: |
| | if specification['id'] in unique_specs: continue |
| | text_list = self.get_document(specification['id'], specification['title']) |
| | text = "\n".join(text_list) |
| | if len(text_list) == 1: continue |
| | corpus_json.append({"text": text, "metadata": specification}) |
| | unique_specs.add(specification['id']) |
| | |
| | corpus_text = [doc["text"] for doc in corpus_json] |
| | corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") |
| |
|
| | print("Indexing BM25") |
| | retriever = BM25HF(corpus=corpus_json) |
| | retriever.index(corpus_tokens) |
| |
|
| | retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSingle", token=os.environ.get("HF")) |
| |
|
| | def run(self): |
| | print("Fetching specification tables from 3GPP...") |
| | specifications = self.fetch_spec_table() |
| | self.total_count = len(specifications) |
| | print(f"Processing {self.total_count} specs with {self.max_workers} threads...") |
| | with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
| | futures = [executor.submit(self.process_specification, spec) for spec in specifications] |
| | for f in concurrent.futures.as_completed(futures): |
| | if self.STOP_EVENT.is_set(): |
| | break |
| | print("All specs processed.") |
| |
|
| | |
| | def save(self): |
| | print("Saving indexed data...") |
| | flat_metadata = [metadata for metadata in self.indexed_specifications.values()] |
| | flat_docs = [] |
| | print("Flatting doc contents") |
| | for doc_id, data in self.documents_by_spec_num.items(): |
| | for title, content in data["content"].items(): |
| | flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) |
| | print("Creating datasets ...") |
| | push_spec_content = Dataset.from_list(flat_docs) |
| | push_spec_metadata = Dataset.from_list(flat_metadata) |
| | |
| | print("Pushing ...") |
| | push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF"]) |
| | push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF"]) |
| | |
| | self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list() |
| | self.documents_by_spec_num = self._make_doc_index(self.spec_contents) |
| | print("Save finished.") |
| |
|
| | class SpecETSIIndexer: |
| | def __init__(self, max_workers=16): |
| | self.session = requests.Session() |
| | self.session.verify = False |
| |
|
| | self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list() |
| | self.documents_by_spec_num = self._make_doc_index(self.spec_contents) |
| | self.indexed_specifications = {} |
| | self.specifications_passed = set() |
| | self.processed_count = 0 |
| | self.total_count = 0 |
| |
|
| | self.DICT_LOCK = threading.Lock() |
| | self.DOCUMENT_LOCK = threading.Lock() |
| | self.STOP_EVENT = threading.Event() |
| | self.max_workers = max_workers |
| |
|
| | self.df = self._fetch_spec_table() |
| | |
| | def _make_doc_index(self, specs): |
| | doc_index = {} |
| | for section in specs: |
| | if section["doc_id"] not in doc_index: |
| | doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} |
| | else: |
| | doc_index[section["doc_id"]]["content"][section["section"]] = section["content"] |
| | return doc_index |
| | |
| | def _fetch_spec_table(self): |
| | |
| | print("Connexion login ETSI...") |
| | self.session.post( |
| | "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", |
| | verify=False, |
| | headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ..."}, |
| | data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}), |
| | ) |
| |
|
| | print("Récupération des métadonnées TS/TR …") |
| | url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1" |
| | url_tr = url_ts.replace("stdType=TS", "stdType=TR") |
| | data_ts = self.session.get(url_ts, verify=False).content |
| | data_tr = self.session.get(url_tr, verify=False).content |
| | df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False) |
| | df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False) |
| |
|
| | backup_ts = df_ts["ETSI deliverable"] |
| | backup_tr = df_tr["ETSI deliverable"] |
| | df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)") |
| | df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)") |
| | version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") |
| | version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") |
| | df_ts["Version"] = version1[0] |
| | df_tr["Version"] = version2[0] |
| |
|
| | def ver_tuple(v): |
| | return tuple(map(int, v.split("."))) |
| | df_ts["temp"] = df_ts["Version"].apply(ver_tuple) |
| | df_tr["temp"] = df_tr["Version"].apply(ver_tuple) |
| | df_ts["Type"] = "TS" |
| | df_tr["Type"] = "TR" |
| | df = pd.concat([df_ts, df_tr]) |
| | unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()] |
| | unique_df = unique_df.drop(columns="temp") |
| | unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))] |
| | df = df.drop(columns="temp") |
| | df = df[(~df["title"].str.contains("3GPP", case=True, na=False))] |
| | return df |
| | |
| | @staticmethod |
| | def hasher(specification: str, version: str): |
| | return hashlib.md5(f"{specification}{version}".encode()).hexdigest() |
| |
|
| | @staticmethod |
| | def get_scope(content): |
| | for title, text in content.items(): |
| | if title.lower().endswith("scope"): |
| | return text |
| | return "" |
| | |
| | def get_document(self, spec_id: str, spec_title: str): |
| | text = [f"{spec_id} - {spec_title}\n"] |
| | for section in self.spec_contents: |
| | if spec_id == section["doc_id"]: |
| | text.extend([f"{section['section']}\n\n{section['content']}"]) |
| | return text |
| | |
| | def get_text(self, specification: str): |
| | if self.STOP_EVENT.is_set(): |
| | return None, [] |
| | print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True) |
| | try: |
| | |
| | row = self.df[self.df["ETSI deliverable"] == specification] |
| | if row.empty: |
| | print(f"[WARN] Spécification {specification} absente du tableau") |
| | return None, [] |
| |
|
| | pdf_link = row.iloc[0]["PDF link"] |
| | response = self.session.get( |
| | pdf_link, |
| | headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...'} |
| | ) |
| | if response.status_code != 200: |
| | print(f"[ERREUR] Echec du téléchargement du PDF pour {specification}.") |
| | return None, [] |
| | pdf = fitz.open(stream=response.content, filetype="pdf") |
| | return pdf, pdf.get_toc() |
| | except Exception as e: |
| | print(f"[ERROR] Échec get_text pour {specification} : {e}", flush=True) |
| | return None, [] |
| | |
| | def get_spec_content(self, specification: str): |
| | def extract_sections(text, titles): |
| | sections = {} |
| | sorted_titles = sorted(titles, key=lambda t: text.find(t)) |
| | for i, title in enumerate(sorted_titles): |
| | start = text.find(title) |
| | if i + 1 < len(sorted_titles): |
| | end = text.find(sorted_titles[i + 1]) |
| | sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip()) |
| | else: |
| | sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip()) |
| | return sections |
| |
|
| | if self.STOP_EVENT.is_set(): |
| | return {} |
| | print(f"[INFO] Extraction du contenu de {specification}", flush=True) |
| | pdf, doc_toc = self.get_text(specification) |
| | text = [] |
| | if not pdf or not doc_toc: |
| | print("[ERREUR] Pas de texte ou table of contents trouvé !") |
| | return {} |
| | |
| | first_page = 0 |
| | for level, title, page in doc_toc: |
| | first_page = page - 1 |
| | break |
| | for page in pdf[first_page:]: |
| | text.append("\n".join([line.strip() for line in page.get_text().splitlines()])) |
| | text = "\n".join(text) |
| | if not text or not doc_toc or self.STOP_EVENT.is_set(): |
| | print("[ERREUR] Pas de texte/table of contents récupéré !") |
| | return {} |
| | titles = [] |
| | for level, title, page in doc_toc: |
| | if self.STOP_EVENT.is_set(): |
| | return {} |
| | if title and title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text: |
| | titles.append('\n'.join(title.strip().split(" ", 1))) |
| | return extract_sections(text, titles) |
| |
|
| | def process_specification(self, spec): |
| | if self.STOP_EVENT.is_set(): |
| | return |
| | try: |
| | version = spec.get('Version') |
| | if not version: return |
| | doc_id = str(spec.get("ETSI deliverable")) |
| | document = None |
| | already_indexed = False |
| |
|
| | with self.DOCUMENT_LOCK: |
| | if (doc_id in self.documents_by_spec_num |
| | and self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version) |
| | and doc_id not in self.specifications_passed): |
| | document = self.documents_by_spec_num[doc_id] |
| | self.specifications_passed.add(doc_id) |
| | already_indexed = True |
| | elif doc_id in self.specifications_passed: |
| | document = self.documents_by_spec_num[doc_id] |
| | already_indexed = True |
| | else: |
| | document_content = self.get_spec_content(doc_id) |
| | if document_content: |
| | self.documents_by_spec_num[doc_id] = {"content": document_content, "hash": self.hasher(doc_id, version)} |
| | document = {"content": document_content, "hash": self.hasher(doc_id, version)} |
| | self.specifications_passed.add(doc_id) |
| | already_indexed = False |
| |
|
| | if document: |
| | string_key = f"{doc_id}+-+{spec['title']}+-+{spec['Type']}+-+{spec['Version']}" |
| | metadata = { |
| | "id": str(doc_id), |
| | "title": spec["title"], |
| | "type": spec["Type"], |
| | "version": version, |
| | "url": spec["PDF link"], |
| | "scope": "" if not document else self.get_scope(document["content"]) |
| | } |
| | with self.DICT_LOCK: |
| | self.indexed_specifications[string_key] = metadata |
| |
|
| | with self.DICT_LOCK: |
| | self.processed_count += 1 |
| | status = "already indexed" if already_indexed else "indexed now" |
| | print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}") |
| |
|
| | except Exception as e: |
| | traceback.print_exc() |
| | print(f"\n[ERREUR] Échec du traitement de {doc_id} {spec.get('Version')}: {e}", flush=True) |
| | with self.DICT_LOCK: |
| | self.processed_count += 1 |
| | print(f"Progress: {self.processed_count}/{self.total_count} specs processed") |
| | |
| | def run(self): |
| | print("Démarrage indexation ETSI…") |
| | specifications = self.df.to_dict(orient="records") |
| | self.total_count = len(specifications) |
| | print(f"Traitement de {self.total_count} specs avec {self.max_workers} threads...\n") |
| |
|
| | with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
| | futures = [executor.submit(self.process_specification, spec) for spec in specifications] |
| | for f in concurrent.futures.as_completed(futures): |
| | if self.STOP_EVENT.is_set(): |
| | break |
| |
|
| | print(f"\nAll {self.processed_count}/{self.total_count} specs processed.") |
| |
|
| | def save(self): |
| | print("\nSauvegarde en cours...", flush=True) |
| | flat_metadata = [metadata for metadata in self.indexed_specifications.values()] |
| | flat_docs = [] |
| | for doc_id, data in self.documents_by_spec_num.items(): |
| | for title, content in data["content"].items(): |
| | flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) |
| | push_spec_content = Dataset.from_list(flat_docs) |
| | push_spec_metadata = Dataset.from_list(flat_metadata) |
| | push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF"]) |
| | push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF"]) |
| | |
| | self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list() |
| | self.documents_by_spec_num = self._make_doc_index(self.spec_contents) |
| | print("Sauvegarde terminée.") |
| | |
| | def create_bm25_index(self): |
| | dataset_metadata = self.indexed_specifications.values() |
| | unique_specs = set() |
| | corpus_json = [] |
| |
|
| | for specification in dataset_metadata: |
| | if specification['id'] in unique_specs: continue |
| | for section in self.spec_contents: |
| | if specification['id'] == section['doc_id']: |
| | corpus_json.append({"text": f"{section['section']}\n{section['content']}", "metadata": { |
| | "id": specification['id'], |
| | "title": specification['title'], |
| | "section_title": section['section'], |
| | "version": specification['version'], |
| | "type": specification['type'], |
| | "url": specification['url'], |
| | "scope": specification['scope'] |
| | }}) |
| | |
| | corpus_text = [doc["text"] for doc in corpus_json] |
| | corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") |
| |
|
| | print("Indexing BM25") |
| | retriever = BM25HF(corpus=corpus_json) |
| | retriever.index(corpus_tokens) |
| |
|
| | retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSections", token=os.environ.get("HF")) |
| |
|
| | unique_specs = set() |
| | corpus_json = [] |
| |
|
| | for specification in dataset_metadata: |
| | if specification['id'] in unique_specs: continue |
| | text_list = self.get_document(specification['id'], specification['title']) |
| | text = "\n".join(text_list) |
| | if len(text_list) == 1: continue |
| | corpus_json.append({"text": text, "metadata": specification}) |
| | unique_specs.add(specification['id']) |
| | |
| | corpus_text = [doc["text"] for doc in corpus_json] |
| | corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") |
| |
|
| | print("Indexing BM25") |
| | retriever = BM25HF(corpus=corpus_json) |
| | retriever.index(corpus_tokens) |
| |
|
| | retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSingle", token=os.environ.get("HF")) |
| | |