Spaces:
Running
Running
| from fastapi import HTTPException | |
| import requests | |
| import re | |
| from bs4 import BeautifulSoup | |
| import os | |
| import json | |
| def _get_proxies() -> dict: | |
| """Return a requests-compatible proxies dict from $http_proxy / $HTTP_PROXY.""" | |
| proxy = os.environ.get("http_proxy") or os.environ.get("HTTP_PROXY") or "" | |
| if not proxy: | |
| return {} | |
| return {"http": proxy, "https": proxy} | |
| class ETSIDocFinder: | |
| HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"} | |
| def __init__(self): | |
| self.main_ftp_url = "https://docbox.etsi.org/SET" | |
| req_data = self.connect() | |
| print(req_data['message']) | |
| self.session = req_data['session'] | |
| def connect(self): | |
| session = requests.Session() | |
| session.headers.update(self.HEADERS) | |
| session.proxies.update(_get_proxies()) | |
| # Seed DNN session cookies — docbox requires the portal session to be | |
| # initialised with domain=docbox.etsi.org so the .DOTNETNUKE cookie | |
| # is scoped to .etsi.org and accepted by docbox.etsi.org as well. | |
| login_redir_url = ( | |
| "https://portal.etsi.org/LoginRedirection.aspx" | |
| "?domain=docbox.etsi.org&ReturnUrl=/" | |
| ) | |
| session.get(login_redir_url, verify=False, timeout=15) | |
| req = session.post( | |
| "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", | |
| data=json.dumps({"username": os.environ.get("EOL_USER"), | |
| "password": os.environ.get("EOL_PASSWORD")}), | |
| headers={"Content-Type": "application/json; charset=UTF-8", | |
| "Referer": login_redir_url}, | |
| verify=False, | |
| allow_redirects=False, | |
| timeout=15, | |
| ) | |
| if req.text == "Failed": | |
| return {"error": True, "session": session, "message": "Login failed ! Check your credentials"} | |
| # Always update self.session so reconnect and reauth actually take effect | |
| self.session = session | |
| return {"error": False, "session": session, "message": "Login successful"} | |
| def download_document(self, url: str) -> bytes: | |
| """Download a docbox file using the authenticated session. | |
| If the session has expired the portal redirects to LoginRedirection — | |
| we detect this and re-authenticate before retrying. | |
| """ | |
| resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True) | |
| # Detect auth redirect (portal login page returned instead of file) | |
| if resp.url and "LoginRedirection" in resp.url: | |
| self.connect() # connect() now updates self.session | |
| resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True) | |
| return resp.content | |
| def get_workgroup(self, doc: str): | |
| main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None | |
| if main_tsg is None: | |
| return None, None, None | |
| regex = re.search(r'\(([^)]+)\)', doc) | |
| workgroup = "20" + regex.group(1) | |
| return main_tsg, workgroup, doc | |
| def find_workgroup_url(self, main_tsg, workgroup): | |
| url = f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS" | |
| response = self.session.get(url, verify=False, timeout=15) | |
| # If docbox redirected to the portal login page, reauth and retry once | |
| if "LoginRedirection" in response.url: | |
| self.connect() | |
| response = self.session.get(url, verify=False, timeout=15) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| for item in soup.find_all("tr"): | |
| link = item.find("a") | |
| if link and workgroup in link.get_text(): | |
| return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}" | |
| return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}" | |
| def get_docs_from_url(self, url): | |
| try: | |
| response = self.session.get(url, verify=False, timeout=15) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| return [item.get_text() for item in soup.select("tr td a")] | |
| except Exception as e: | |
| print(f"Error accessing {url}: {e}") | |
| return [] | |
| def search_document(self, doc_id: str): | |
| original = doc_id | |
| main_tsg, workgroup, doc = self.get_workgroup(doc_id) | |
| urls = [] | |
| if main_tsg: | |
| wg_url = self.find_workgroup_url(main_tsg, workgroup) | |
| print(wg_url) | |
| if wg_url: | |
| entries = self.get_docs_from_url(wg_url) | |
| print(entries) | |
| for entry in entries: | |
| if doc in entry.lower() or original in entry: | |
| doc_url = f"{wg_url}/{entry}" | |
| urls.append(doc_url) | |
| elif "." not in entry.rstrip("/"): | |
| # looks like a subdirectory — go one level deeper | |
| sub_url = f"{wg_url}/{entry}" | |
| files = self.get_docs_from_url(sub_url) | |
| for f in files: | |
| if doc in f.lower() or original in f: | |
| print(f) | |
| urls.append(f"{sub_url}/{f}") | |
| return urls[0] if len(urls) == 1 else urls[-1] if len(urls) > 1 else f"Document {doc_id} not found" | |
| class ETSISpecFinder: | |
| def __init__(self): | |
| self.main_url = "https://www.etsi.org/deliver/etsi_ts" | |
| self.second_url = "https://www.etsi.org/deliver/etsi_tr" | |
| self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"} | |
| def get_spec_path(self, doc_id: str): | |
| if "-" in doc_id: | |
| position, part = doc_id.split("-") | |
| else: | |
| position, part = doc_id, None | |
| position = position.replace(" ", "") | |
| if part: | |
| if len(part) == 1: | |
| part = "0" + part | |
| spec_folder = position + part if part is not None else position | |
| return f"{int(position) - (int(position)%100)}_{int(position) - (int(position)%100) + 99}/{spec_folder}" | |
| def get_docs_from_url(self, url): | |
| try: | |
| response = requests.get(url, verify=False, timeout=15, proxies=_get_proxies()) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| docs = [item.get_text() for item in soup.find_all("a")][1:] | |
| return docs | |
| except Exception as e: | |
| print(f"Error accessing {url}: {e}") | |
| return [] | |
| def _normalise_version(self, version: str) -> str: | |
| """Normalise a user-supplied version string to ETSI zero-padded format. | |
| '17.6.0' -> '17.06.00' (the '_60' release suffix is ignored during matching) | |
| Already-normalised strings like '17.06.00' are returned unchanged.""" | |
| parts = version.strip("/").split(".") | |
| if len(parts) == 3: | |
| try: | |
| return f"{int(parts[0]):02d}.{int(parts[1]):02d}.{int(parts[2]):02d}" | |
| except ValueError: | |
| pass | |
| return version.strip("/") | |
| def _pick_release(self, releases: list, version: str = None) -> str: | |
| """Return the release folder matching version, or the latest if not found/specified.""" | |
| if version: | |
| target = self._normalise_version(version) | |
| for r in releases: | |
| # folder names are like '17.06.00_60'; match on the part before '_' | |
| folder = r.strip("/").split("_")[0] | |
| if folder == target: | |
| return r | |
| return releases[-1] | |
| def search_document(self, doc_id: str, version: str = None): | |
| # Example : 103 666[-2 opt] | |
| original = doc_id | |
| url = f"{self.main_url}/{self.get_spec_path(original)}/" | |
| url2 = f"{self.second_url}/{self.get_spec_path(original)}/" | |
| print(url) | |
| print(url2) | |
| releases = self.get_docs_from_url(url) | |
| if releases: | |
| release = self._pick_release(releases, version) | |
| files = self.get_docs_from_url(url + release) | |
| for f in files: | |
| if f.endswith(".pdf"): | |
| return url + release + "/" + f | |
| releases = self.get_docs_from_url(url2) | |
| if releases: | |
| release = self._pick_release(releases, version) | |
| files = self.get_docs_from_url(url2 + release) | |
| for f in files: | |
| if f.endswith(".pdf"): | |
| return url2 + release + "/" + f | |
| return f"Specification {doc_id} not found" | |
| def _get_wki_id(self, doc_id: str, version: str = None) -> str: | |
| """Return the ETSI portal wki_id for a spec version, or None if not found.""" | |
| if version: | |
| version_str = version | |
| else: | |
| # Derive version from the FTP PDF URL | |
| pdf_url = self.search_document(doc_id) | |
| if "not found" in pdf_url.lower(): | |
| return None | |
| # URL path: .../18.04.00_60/ts_...p.pdf → folder is parts[-2] | |
| parts = pdf_url.rstrip("/").split("/") | |
| version_folder = parts[-2] # e.g. "18.04.00_60" | |
| v_parts = version_folder.split("_")[0].split(".") # ["18", "04", "00"] | |
| try: | |
| version_str = f"{int(v_parts[0])}.{int(v_parts[1])}.{int(v_parts[2])}" | |
| except (ValueError, IndexError): | |
| return None | |
| for spec_type in ["TS", "TR"]: | |
| params = { | |
| "option": "com_standardssearch", | |
| "view": "data", | |
| "format": "json", | |
| "page": "1", | |
| "search": f"ETSI {spec_type} {doc_id} v{version_str}", | |
| "etsiNumber": "1", | |
| "published": "1", | |
| } | |
| try: | |
| resp = requests.get("https://www.etsi.org/", params=params, | |
| headers=self.headers, verify=False, timeout=15, | |
| proxies=_get_proxies()) | |
| data = resp.json() | |
| if data and isinstance(data, list): | |
| return str(data[0]["wki_id"]) | |
| except Exception as e: | |
| print(f"Error getting wki_id for {doc_id}: {e}") | |
| return None | |
| def _authenticate_eol(self, wki_id: str) -> requests.Session: | |
| """Create a requests.Session authenticated to the ETSI EOL portal.""" | |
| session = requests.Session() | |
| session.headers.update({"User-Agent": self.headers["User-Agent"]}) | |
| session.proxies.update(_get_proxies()) | |
| login_redir_url = ( | |
| f"https://portal.etsi.org/LoginRedirection.aspx" | |
| f"?ReturnUrl=%2fwebapp%2fprotect%2fNTaccount.asp%3fWki_Id%3d{wki_id}" | |
| f"&Wki_Id={wki_id}" | |
| ) | |
| # Seed DNN session cookies | |
| session.get(login_redir_url, verify=False, timeout=15) | |
| # Authenticate via EOL JSON login | |
| session.post( | |
| "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", | |
| data=json.dumps({"username": os.environ.get("EOL_USER"), | |
| "password": os.environ.get("EOL_PASSWORD")}), | |
| headers={"Content-Type": "application/json; charset=UTF-8", | |
| "Referer": login_redir_url}, | |
| verify=False, | |
| allow_redirects=False, | |
| timeout=15, | |
| ) | |
| return session | |
| def search_document_docx(self, doc_id: str, version: str = None) -> str: | |
| """Download an ETSI spec as DOCX and return the local file path.""" | |
| wki_id = self._get_wki_id(doc_id, version) | |
| if not wki_id: | |
| return f"Specification {doc_id} not found" | |
| session = self._authenticate_eol(wki_id) | |
| # NTaccount.asp → parse profile_id from meta-refresh | |
| r = session.get( | |
| f"https://portal.etsi.org/webapp/protect/NTaccount.asp?Wki_Id={wki_id}", | |
| verify=False, timeout=15, | |
| ) | |
| meta_match = re.search(r'URL=([^"\'\s>]+)', r.text) | |
| if not meta_match: | |
| return f"Specification {doc_id}: authentication failed" | |
| meta_url = meta_match.group(1) | |
| if not meta_url.startswith("http"): | |
| meta_url = f"https://portal.etsi.org/webapp/protect/{meta_url}" | |
| # CheckIdentifier → 302 to copy_file | |
| r2 = session.get(meta_url, allow_redirects=False, verify=False, timeout=15) | |
| if r2.status_code != 302: | |
| return f"Specification {doc_id}: download chain failed" | |
| # copy_file (may have a second redirect) | |
| copy_url = "https://portal.etsi.org" + r2.headers["Location"] | |
| r3 = session.get(copy_url, allow_redirects=False, verify=False, timeout=15) | |
| if r3.status_code == 302: | |
| final_url = "https://portal.etsi.org/webapp/ewp/" + r3.headers["Location"] | |
| r4 = session.get(final_url, verify=False, timeout=15) | |
| else: | |
| r4 = r3 | |
| # Extract DOCX link | |
| docx_urls = re.findall(r'href=["\']([^"\']*\.docx)["\']', r4.text, re.IGNORECASE) | |
| if not docx_urls: | |
| return f"Specification {doc_id}: DOCX not available" | |
| docx_url = docx_urls[0] | |
| # Download | |
| dl = session.get(docx_url, headers={"Referer": r4.url}, verify=False, timeout=60) | |
| filename = docx_url.split("/")[-1] | |
| tmp_path = f"/tmp/{filename}" | |
| with open(tmp_path, "wb") as f: | |
| f.write(dl.content) | |
| return tmp_path |