Spaces:
Sleeping
Sleeping
| """ | |
| etsi_client.py β ETSI document download helpers for ApplyCRs. | |
| Provides: | |
| CRFetcher β CR TDoc downloads via docbox.etsi.org | |
| TSFetcher β TS DOCX downloads via portal.etsi.org WKI chain | |
| """ | |
| import json | |
| import os | |
| import re | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from urllib.parse import urljoin | |
| import requests | |
| import urllib3 | |
| from bs4 import BeautifulSoup | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| def _get_proxies() -> dict: | |
| """Return a requests-compatible proxies dict from $http_proxy / $HTTP_PROXY.""" | |
| proxy = os.environ.get("http_proxy") or os.environ.get("HTTP_PROXY") or "" | |
| if not proxy: | |
| return {} | |
| return {"http": proxy, "https": proxy} | |
| class CRFetcher: | |
| HEADERS = { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/136.0.0.0 Safari/537.36" | |
| ) | |
| } | |
| def __init__(self, eol_user: str, eol_password: str): | |
| self.eol_user = eol_user | |
| self.eol_password = eol_password | |
| self.main_ftp_url = "https://docbox.etsi.org/SET" | |
| req_data = self.connect() | |
| self.session = req_data["session"] | |
| def connect(self): | |
| session = requests.Session() | |
| session.headers.update(self.HEADERS) | |
| session.proxies.update(_get_proxies()) | |
| # Seed DNN session cookies β docbox requires the portal session to be | |
| # initialised with domain=docbox.etsi.org so the .DOTNETNUKE cookie | |
| # is scoped to .etsi.org and accepted by docbox.etsi.org as well. | |
| login_redir_url = ( | |
| "https://portal.etsi.org/LoginRedirection.aspx" | |
| "?domain=docbox.etsi.org&ReturnUrl=/" | |
| ) | |
| session.get(login_redir_url, verify=False, timeout=15) | |
| req = session.post( | |
| "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", | |
| data=json.dumps({"username": self.eol_user, "password": self.eol_password}), | |
| headers={ | |
| "Content-Type": "application/json; charset=UTF-8", | |
| "Referer": login_redir_url, | |
| }, | |
| verify=False, | |
| allow_redirects=False, | |
| timeout=15, | |
| ) | |
| if req.text == "Failed": | |
| return { | |
| "error": True, | |
| "session": session, | |
| "message": "Login failed! Check your credentials", | |
| } | |
| self.session = session | |
| return {"error": False, "session": session, "message": "Login successful"} | |
| def download_document(self, url: str) -> bytes: | |
| """Download a docbox file using the authenticated session. | |
| If the session has expired the portal redirects to LoginRedirection β | |
| we detect this and re-authenticate before retrying. | |
| """ | |
| resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True) | |
| if resp.url and "LoginRedirection" in resp.url: | |
| self.connect() | |
| resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True) | |
| return resp.content | |
| def get_workgroup(self, doc: str): | |
| main_tsg = ( | |
| "SET-WG-R" | |
| if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) | |
| else "SET-WG-T" | |
| if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) | |
| else "SET" | |
| if any(doc.startswith(kw) for kw in ["SET", "SCP"]) | |
| else None | |
| ) | |
| if main_tsg is None: | |
| return None, None, None | |
| regex = re.search(r"\(([^)]+)\)", doc) | |
| workgroup = "20" + regex.group(1) | |
| return main_tsg, workgroup, doc | |
| def find_workgroup_url(self, main_tsg, workgroup): | |
| url = f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS" | |
| response = self.session.get(url, verify=False, timeout=15) | |
| if "LoginRedirection" in response.url: | |
| self.connect() | |
| response = self.session.get(url, verify=False, timeout=15) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| for item in soup.find_all("tr"): | |
| link = item.find("a") | |
| if link and workgroup in link.get_text(): | |
| return ( | |
| f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}" | |
| ) | |
| return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}" | |
| def get_docs_from_url(self, url): | |
| try: | |
| response = self.session.get(url, verify=False, timeout=15) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| return [item.get_text() for item in soup.select("tr td a")] | |
| except Exception as e: | |
| print(f"Error accessing {url}: {e}") | |
| return [] | |
| def search_document(self, doc_id: str): | |
| original = doc_id | |
| main_tsg, workgroup, doc = self.get_workgroup(doc_id) | |
| urls = [] | |
| if main_tsg: | |
| wg_url = self.find_workgroup_url(main_tsg, workgroup) | |
| if wg_url: | |
| entries = self.get_docs_from_url(wg_url) | |
| for entry in entries: | |
| if doc in entry.lower() or original in entry: | |
| doc_url = f"{wg_url}/{entry}" | |
| urls.append(doc_url) | |
| elif "." not in entry.rstrip("/"): | |
| sub_url = f"{wg_url}/{entry}" | |
| files = self.get_docs_from_url(sub_url) | |
| for f in files: | |
| if doc in f.lower() or original in f: | |
| urls.append(f"{sub_url}/{f}") | |
| return ( | |
| urls[0] | |
| if len(urls) == 1 | |
| else urls[-1] | |
| if len(urls) > 1 | |
| else f"Document {doc_id} not found" | |
| ) | |
| class TSFetcher: | |
| def __init__(self, eol_user: str, eol_password: str): | |
| self.eol_user = eol_user | |
| self.eol_password = eol_password | |
| self.main_url = "https://www.etsi.org/deliver/etsi_ts" | |
| self.second_url = "https://www.etsi.org/deliver/etsi_tr" | |
| self.headers = { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/136.0.0.0 Safari/537.36" | |
| ) | |
| } | |
| def get_spec_path(self, doc_id: str): | |
| if "-" in doc_id: | |
| position, part = doc_id.split("-") | |
| else: | |
| position, part = doc_id, None | |
| position = position.replace(" ", "") | |
| if part: | |
| if len(part) == 1: | |
| part = "0" + part | |
| spec_folder = position + part if part is not None else position | |
| return ( | |
| f"{int(position) - (int(position) % 100)}_" | |
| f"{int(position) - (int(position) % 100) + 99}/{spec_folder}" | |
| ) | |
| def get_docs_from_url(self, url): | |
| try: | |
| response = requests.get( | |
| url, verify=False, timeout=15, proxies=_get_proxies() | |
| ) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| docs = [item.get_text() for item in soup.find_all("a")][1:] | |
| return docs | |
| except Exception as e: | |
| print(f"Error accessing {url}: {e}") | |
| return [] | |
| def _normalise_version(self, version: str) -> str: | |
| """Normalise a user-supplied version string to ETSI zero-padded format. | |
| '17.6.0' -> '17.06.00' (the '_60' release suffix is ignored during matching) | |
| Already-normalised strings like '17.06.00' are returned unchanged.""" | |
| parts = version.strip("/").split(".") | |
| if len(parts) == 3: | |
| try: | |
| return f"{int(parts[0]):02d}.{int(parts[1]):02d}.{int(parts[2]):02d}" | |
| except ValueError: | |
| pass | |
| return version.strip("/") | |
| def _pick_release(self, releases: list, version: str = None) -> str: | |
| """Return the release folder matching version, or the latest if not found/specified.""" | |
| if version: | |
| target = self._normalise_version(version) | |
| for r in releases: | |
| folder = r.strip("/").split("_")[0] | |
| if folder == target: | |
| return r | |
| return releases[-1] | |
| def search_document(self, doc_id: str, version: str = None): | |
| original = doc_id | |
| url = f"{self.main_url}/{self.get_spec_path(original)}/" | |
| url2 = f"{self.second_url}/{self.get_spec_path(original)}/" | |
| print(url) | |
| print(url2) | |
| releases = self.get_docs_from_url(url) | |
| if releases: | |
| release = self._pick_release(releases, version) | |
| files = self.get_docs_from_url(url + release) | |
| for f in files: | |
| if f.endswith(".pdf"): | |
| return url + release + "/" + f | |
| releases = self.get_docs_from_url(url2) | |
| if releases: | |
| release = self._pick_release(releases, version) | |
| files = self.get_docs_from_url(url2 + release) | |
| for f in files: | |
| if f.endswith(".pdf"): | |
| return url2 + release + "/" + f | |
| return f"Specification {doc_id} not found" | |
| def _get_wki_id_candidates(self, doc_id: str, version: str = None) -> tuple: | |
| """Return (candidates, version_str) for a spec version (best match first).""" | |
| if version: | |
| version_str = version | |
| else: | |
| pdf_url = self.search_document(doc_id) | |
| if "not found" in pdf_url.lower(): | |
| return [], "" | |
| parts = pdf_url.rstrip("/").split("/") | |
| version_folder = parts[-2] # e.g. "18.04.00_60" | |
| v_parts = version_folder.split("_")[0].split(".") # ["18", "04", "00"] | |
| try: | |
| version_str = f"{int(v_parts[0])}.{int(v_parts[1])}.{int(v_parts[2])}" | |
| except (ValueError, IndexError): | |
| return [], "" | |
| def fetch_candidates(): | |
| spec_num = doc_id.split("-")[0].replace(" ", "") | |
| import datetime | |
| today = datetime.date.today().isoformat() | |
| base_params = { | |
| "format": "json", | |
| "page": "1", | |
| "title": "1", | |
| "etsiNumber": "1", | |
| "content": "1", | |
| "version": "0", | |
| "onApproval": "1", | |
| "published": "1", | |
| "withdrawn": "1", | |
| "historical": "1", | |
| "isCurrent": "1", | |
| "superseded": "1", | |
| "startDate": "1988-01-15", | |
| "endDate": today, | |
| "harmonized": "0", | |
| "keyword": "", | |
| "TB": "", | |
| "stdType": "", | |
| "frequency": "", | |
| "mandate": "", | |
| "collection": "", | |
| "sort": "1", | |
| } | |
| # ETSI UI sends capital-V version; try both to be safe | |
| queries = [ | |
| f"{doc_id} V{version_str}", # e.g. "104 005 V1.2.1" (UI format) | |
| f"{doc_id} v{version_str}", # e.g. "104 005 v1.2.1" | |
| doc_id, # e.g. "104 005" (wider net) | |
| ] | |
| seen = {} | |
| for query in queries: | |
| params = {**base_params, "search": query} | |
| try: | |
| resp = requests.get( | |
| "https://www.etsi.org/custom/standardssearch/data.php", | |
| params=params, | |
| headers={ | |
| **self.headers, | |
| "Referer": "https://www.etsi.org/standards/", | |
| }, | |
| verify=False, | |
| timeout=15, | |
| proxies=_get_proxies(), | |
| ) | |
| data = resp.json() | |
| if data and isinstance(data, list): | |
| hits = [ | |
| str(item["wki_id"]) | |
| for item in data | |
| if "wki_id" in item and spec_num in json.dumps(item) | |
| ] | |
| for h in hits: | |
| seen[h] = None | |
| if hits: | |
| print(f" wki_id search query={query!r} β {len(hits)} hit(s)") | |
| break | |
| except Exception as e: | |
| print(f"Error getting wki_id for {doc_id} (query={query!r}): {e}") | |
| return list(seen.keys()) | |
| candidates = list(dict.fromkeys(fetch_candidates())) | |
| return candidates, version_str | |
| def _authenticate_eol(self) -> requests.Session: | |
| """Create a requests.Session authenticated to the ETSI EOL portal.""" | |
| session = requests.Session() | |
| session.headers.update({"User-Agent": self.headers["User-Agent"]}) | |
| session.proxies.update(_get_proxies()) | |
| login_redir_url = ( | |
| "https://portal.etsi.org/LoginRedirection.aspx" | |
| "?domain=docbox.etsi.org&ReturnUrl=/" | |
| ) | |
| session.get(login_redir_url, verify=False, timeout=15) | |
| login_resp = session.post( | |
| "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", | |
| data=json.dumps({"username": self.eol_user, "password": self.eol_password}), | |
| headers={ | |
| "Content-Type": "application/json; charset=UTF-8", | |
| "Referer": login_redir_url, | |
| }, | |
| verify=False, | |
| allow_redirects=False, | |
| timeout=15, | |
| ) | |
| if login_resp.text.strip() == "Failed": | |
| raise RuntimeError( | |
| "ETSI EOL login failed β check EOL_USER / EOL_PASSWORD" | |
| ) | |
| return session | |
| def search_document_docx(self, doc_id: str, version: str = None) -> str: | |
| """Download an ETSI spec as DOCX and return the local file path.""" | |
| candidates, version_str = self._get_wki_id_candidates(doc_id, version) | |
| if not candidates: | |
| return f"Specification {doc_id} not found" | |
| try: | |
| version_tag = "".join(f"{int(p):02d}" for p in version_str.split(".")) | |
| except (ValueError, AttributeError): | |
| version_tag = "" | |
| auth_session = self._authenticate_eol() | |
| def try_wki(wki_id): | |
| print(f"Trying wki_id={wki_id} for {doc_id}") | |
| session = requests.Session() | |
| session.headers.update({"User-Agent": self.headers["User-Agent"]}) | |
| session.proxies.update(_get_proxies()) | |
| session.cookies.update(auth_session.cookies) | |
| # Step 1: LogonRedirection.asp registers the download intent server-side, | |
| # generates a one-time profile_id, then 302s to NTaccount.asp. | |
| # allow_redirects=True means the final response IS the NTaccount.asp page. | |
| # Do NOT call NTaccount.asp again β a second call invalidates profile_id A | |
| # and the server rejects the new profile_id B with "Your identifier is wrong". | |
| r_logon = session.get( | |
| f"https://portal.etsi.org/webapp/workprogram/LogonRedirection.asp" | |
| f"?wki_id={wki_id}", | |
| verify=False, | |
| timeout=15, | |
| allow_redirects=True, | |
| ) | |
| meta_match = re.search(r"URL=([^\"'\s>]+)", r_logon.text) | |
| if not meta_match: | |
| print( | |
| f" wki_id={wki_id}: authentication failed " | |
| f"(no URL= in NTaccount.asp), trying next" | |
| ) | |
| return None | |
| meta_url = urljoin(r_logon.url, meta_match.group(1)) | |
| r2 = session.get(meta_url, allow_redirects=False, verify=False, timeout=15) | |
| if r2.status_code != 302: | |
| print( | |
| f" wki_id={wki_id}: unexpected status {r2.status_code}, trying next" | |
| ) | |
| return None | |
| location2 = r2.headers.get("Location", "") | |
| if "processerror" in location2.lower(): | |
| print(f" wki_id={wki_id}: portal rejected ({location2}), trying next") | |
| return None | |
| copy_url = urljoin("https://portal.etsi.org/", location2) | |
| r3 = session.get(copy_url, allow_redirects=False, verify=False, timeout=15) | |
| if r3.status_code == 302: | |
| location3 = r3.headers.get("Location", "") | |
| final_url = urljoin("https://portal.etsi.org/webapp/ewp/", location3) | |
| r4 = session.get(final_url, verify=False, timeout=15) | |
| else: | |
| r4 = r3 | |
| docx_urls = re.findall( | |
| r'href=["\']([^"\']*\.docx)["\']', r4.text, re.IGNORECASE | |
| ) | |
| if not docx_urls: | |
| print(f" wki_id={wki_id}: DOCX not found in page, trying next") | |
| return None | |
| spec_num = doc_id.split("-")[0].replace(" ", "") | |
| matching_urls = [u for u in docx_urls if spec_num in u.split("/")[-1]] | |
| if not matching_urls: | |
| print( | |
| f" wki_id={wki_id}: DOCX spec mismatch " | |
| f"(expected {spec_num}), trying next" | |
| ) | |
| return None | |
| if version_tag: | |
| version_candidates = [ | |
| version_tag, # "010201" | |
| f"v{version_tag}", # "v010201" | |
| version_str.replace(".", ""), # "121" | |
| version_str, # "1.2.1" | |
| version_str.replace(".", "_"), # "1_2_1" | |
| ] | |
| versioned_urls = [] | |
| for tag in version_candidates: | |
| versioned_urls = [ | |
| u for u in matching_urls if tag in u.split("/")[-1] | |
| ] | |
| if versioned_urls: | |
| break | |
| if not versioned_urls: | |
| found_names = [u.split("/")[-1] for u in matching_urls] | |
| # Decode the available version from the first filename (e.g. v160500 β 16.5.0) | |
| avail_ver = None | |
| if found_names: | |
| m = re.search(r'v(\d{6})p?', found_names[0]) | |
| if m: | |
| t = m.group(1) | |
| avail_ver = f"{int(t[0:2])}.{int(t[2:4])}.{int(t[4:6])}" | |
| if avail_ver: | |
| print( | |
| f"\n *** WARNING ***\n" | |
| f" TS {doc_id} v{version_str} is not available on the ETSI portal.\n" | |
| f" Portal has v{avail_ver} (file: {found_names[0]}).\n" | |
| f" Options: target v{avail_ver} in your CR, or drop the TS DOCX manually.\n" | |
| ) | |
| else: | |
| print( | |
| f" wki_id={wki_id}: version tag not in filenames {found_names}, " | |
| f"rejecting (wrong version would be downloaded)" | |
| ) | |
| return None | |
| matching_urls = versioned_urls | |
| docx_url = matching_urls[0] | |
| dl = session.get( | |
| docx_url, | |
| headers={"Referer": r4.url}, | |
| verify=False, | |
| timeout=60, | |
| ) | |
| filename = docx_url.split("/")[-1] | |
| tmp_path = f"/tmp/{filename}" | |
| with open(tmp_path, "wb") as f: | |
| f.write(dl.content) | |
| print(f" wki_id={wki_id}: success") | |
| return tmp_path | |
| executor = ThreadPoolExecutor(max_workers=min(len(candidates), 4)) | |
| try: | |
| futures = {executor.submit(try_wki, wki_id): wki_id for wki_id in candidates} | |
| for future in as_completed(futures): | |
| result = future.result() | |
| if result is not None: | |
| for f in futures: | |
| f.cancel() | |
| return result | |
| finally: | |
| executor.shutdown(wait=False) | |
| return f"Specification {doc_id}: all {len(candidates)} wki_id candidate(s) rejected" | |