DocFinder / classes.py
heymenn's picture
show more info to catch errors on etsi login page
801e72f
raw
history blame
14.6 kB
from fastapi import HTTPException
import requests
import re
from bs4 import BeautifulSoup
import os
import json
from urllib.parse import urljoin
def _get_proxies() -> dict:
"""Return a requests-compatible proxies dict from $http_proxy / $HTTP_PROXY."""
proxy = os.environ.get("http_proxy") or os.environ.get("HTTP_PROXY") or ""
if not proxy:
return {}
return {"http": proxy, "https": proxy}
class ETSIDocFinder:
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}
def __init__(self):
self.main_ftp_url = "https://docbox.etsi.org/SET"
req_data = self.connect()
print(req_data['message'])
self.session = req_data['session']
def connect(self):
session = requests.Session()
session.headers.update(self.HEADERS)
session.proxies.update(_get_proxies())
# Seed DNN session cookies — docbox requires the portal session to be
# initialised with domain=docbox.etsi.org so the .DOTNETNUKE cookie
# is scoped to .etsi.org and accepted by docbox.etsi.org as well.
login_redir_url = (
"https://portal.etsi.org/LoginRedirection.aspx"
"?domain=docbox.etsi.org&ReturnUrl=/"
)
session.get(login_redir_url, verify=False, timeout=15)
req = session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
data=json.dumps({"username": os.environ.get("EOL_USER"),
"password": os.environ.get("EOL_PASSWORD")}),
headers={"Content-Type": "application/json; charset=UTF-8",
"Referer": login_redir_url},
verify=False,
allow_redirects=False,
timeout=15,
)
if req.text == "Failed":
return {"error": True, "session": session, "message": "Login failed ! Check your credentials"}
# Always update self.session so reconnect and reauth actually take effect
self.session = session
return {"error": False, "session": session, "message": "Login successful"}
def download_document(self, url: str) -> bytes:
"""Download a docbox file using the authenticated session.
If the session has expired the portal redirects to LoginRedirection —
we detect this and re-authenticate before retrying.
"""
resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True)
# Detect auth redirect (portal login page returned instead of file)
if resp.url and "LoginRedirection" in resp.url:
self.connect() # connect() now updates self.session
resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True)
return resp.content
def get_workgroup(self, doc: str):
main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None
if main_tsg is None:
return None, None, None
regex = re.search(r'\(([^)]+)\)', doc)
workgroup = "20" + regex.group(1)
return main_tsg, workgroup, doc
def find_workgroup_url(self, main_tsg, workgroup):
url = f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS"
response = self.session.get(url, verify=False, timeout=15)
# If docbox redirected to the portal login page, reauth and retry once
if "LoginRedirection" in response.url:
self.connect()
response = self.session.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, 'html.parser')
for item in soup.find_all("tr"):
link = item.find("a")
if link and workgroup in link.get_text():
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}"
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}"
def get_docs_from_url(self, url):
try:
response = self.session.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, "html.parser")
return [item.get_text() for item in soup.select("tr td a")]
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def search_document(self, doc_id: str):
original = doc_id
main_tsg, workgroup, doc = self.get_workgroup(doc_id)
urls = []
if main_tsg:
wg_url = self.find_workgroup_url(main_tsg, workgroup)
print(wg_url)
if wg_url:
entries = self.get_docs_from_url(wg_url)
print(entries)
for entry in entries:
if doc in entry.lower() or original in entry:
doc_url = f"{wg_url}/{entry}"
urls.append(doc_url)
elif "." not in entry.rstrip("/"):
# looks like a subdirectory — go one level deeper
sub_url = f"{wg_url}/{entry}"
files = self.get_docs_from_url(sub_url)
for f in files:
if doc in f.lower() or original in f:
print(f)
urls.append(f"{sub_url}/{f}")
return urls[0] if len(urls) == 1 else urls[-1] if len(urls) > 1 else f"Document {doc_id} not found"
class ETSISpecFinder:
def __init__(self):
self.main_url = "https://www.etsi.org/deliver/etsi_ts"
self.second_url = "https://www.etsi.org/deliver/etsi_tr"
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}
def get_spec_path(self, doc_id: str):
if "-" in doc_id:
position, part = doc_id.split("-")
else:
position, part = doc_id, None
position = position.replace(" ", "")
if part:
if len(part) == 1:
part = "0" + part
spec_folder = position + part if part is not None else position
return f"{int(position) - (int(position)%100)}_{int(position) - (int(position)%100) + 99}/{spec_folder}"
def get_docs_from_url(self, url):
try:
response = requests.get(url, verify=False, timeout=15, proxies=_get_proxies())
soup = BeautifulSoup(response.text, "html.parser")
docs = [item.get_text() for item in soup.find_all("a")][1:]
return docs
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def _normalise_version(self, version: str) -> str:
"""Normalise a user-supplied version string to ETSI zero-padded format.
'17.6.0' -> '17.06.00' (the '_60' release suffix is ignored during matching)
Already-normalised strings like '17.06.00' are returned unchanged."""
parts = version.strip("/").split(".")
if len(parts) == 3:
try:
return f"{int(parts[0]):02d}.{int(parts[1]):02d}.{int(parts[2]):02d}"
except ValueError:
pass
return version.strip("/")
def _pick_release(self, releases: list, version: str = None) -> str:
"""Return the release folder matching version, or the latest if not found/specified."""
if version:
target = self._normalise_version(version)
for r in releases:
# folder names are like '17.06.00_60'; match on the part before '_'
folder = r.strip("/").split("_")[0]
if folder == target:
return r
return releases[-1]
def search_document(self, doc_id: str, version: str = None):
# Example : 103 666[-2 opt]
original = doc_id
url = f"{self.main_url}/{self.get_spec_path(original)}/"
url2 = f"{self.second_url}/{self.get_spec_path(original)}/"
print(url)
print(url2)
releases = self.get_docs_from_url(url)
if releases:
release = self._pick_release(releases, version)
files = self.get_docs_from_url(url + release)
for f in files:
if f.endswith(".pdf"):
return url + release + "/" + f
releases = self.get_docs_from_url(url2)
if releases:
release = self._pick_release(releases, version)
files = self.get_docs_from_url(url2 + release)
for f in files:
if f.endswith(".pdf"):
return url2 + release + "/" + f
return f"Specification {doc_id} not found"
def _get_wki_id_candidates(self, doc_id: str, version: str = None) -> list:
"""Return a list of candidate wki_ids for a spec version (best match first)."""
if version:
version_str = version
else:
# Derive version from the FTP PDF URL
pdf_url = self.search_document(doc_id)
if "not found" in pdf_url.lower():
return []
parts = pdf_url.rstrip("/").split("/")
version_folder = parts[-2] # e.g. "18.04.00_60"
v_parts = version_folder.split("_")[0].split(".") # ["18", "04", "00"]
try:
version_str = f"{int(v_parts[0])}.{int(v_parts[1])}.{int(v_parts[2])}"
except (ValueError, IndexError):
return []
candidates = []
for spec_type in ["TS", "TR"]:
params = {
"option": "com_standardssearch",
"view": "data",
"format": "json",
"page": "1",
"search": f"ETSI {spec_type} {doc_id} v{version_str}",
"etsiNumber": "1",
"published": "1",
}
try:
resp = requests.get("https://www.etsi.org/", params=params,
headers=self.headers, verify=False, timeout=15,
proxies=_get_proxies())
data = resp.json()
if data and isinstance(data, list):
candidates.extend(str(item["wki_id"]) for item in data if "wki_id" in item)
except Exception as e:
print(f"Error getting wki_id for {doc_id}: {e}")
return candidates
def _authenticate_eol(self, wki_id: str) -> requests.Session:
"""Create a requests.Session authenticated to the ETSI EOL portal."""
session = requests.Session()
session.headers.update({"User-Agent": self.headers["User-Agent"]})
session.proxies.update(_get_proxies())
login_redir_url = (
f"https://portal.etsi.org/LoginRedirection.aspx"
f"?ReturnUrl=%2fwebapp%2fprotect%2fNTaccount.asp%3fWki_Id%3d{wki_id}"
f"&Wki_Id={wki_id}"
)
# Seed DNN session cookies
session.get(login_redir_url, verify=False, timeout=15)
# Authenticate via EOL JSON login
session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
data=json.dumps({"username": os.environ.get("EOL_USER"),
"password": os.environ.get("EOL_PASSWORD")}),
headers={"Content-Type": "application/json; charset=UTF-8",
"Referer": login_redir_url},
verify=False,
allow_redirects=False,
timeout=15,
)
return session
def search_document_docx(self, doc_id: str, version: str = None) -> str:
"""Download an ETSI spec as DOCX and return the local file path."""
candidates = self._get_wki_id_candidates(doc_id, version)
if not candidates:
return f"Specification {doc_id} not found"
for wki_id in candidates:
print(f"Trying wki_id={wki_id} for {doc_id}")
session = self._authenticate_eol(wki_id)
# NTaccount.asp → parse profile_id from meta-refresh
r = session.get(
f"https://portal.etsi.org/webapp/protect/NTaccount.asp?Wki_Id={wki_id}",
verify=False, timeout=15,
)
meta_match = re.search(r'URL=([^"\'\s>]+)', r.text)
if not meta_match:
print(f" wki_id={wki_id}: authentication failed, trying next")
continue
meta_url = meta_match.group(1)
if not meta_url.startswith("http"):
meta_url = f"https://portal.etsi.org/webapp/protect/{meta_url}"
# CheckIdentifier → 302 to copy_file
r2 = session.get(meta_url, allow_redirects=False, verify=False, timeout=15)
if r2.status_code != 302:
print(f" wki_id={wki_id}: unexpected status {r2.status_code}, trying next")
continue
location2 = r2.headers.get("Location", "")
if "processError" in location2 or "processErrors" in location2:
print(f" wki_id={wki_id}: portal rejected ({location2}), trying next")
continue
# copy_file (may have a second redirect)
copy_url = urljoin("https://portal.etsi.org/", location2)
r3 = session.get(copy_url, allow_redirects=False, verify=False, timeout=15)
if r3.status_code == 302:
location3 = r3.headers.get("Location", "")
final_url = urljoin("https://portal.etsi.org/webapp/ewp/", location3)
r4 = session.get(final_url, verify=False, timeout=15)
else:
r4 = r3
# Extract DOCX link
docx_urls = re.findall(r'href=["\']([^"\']*\.docx)["\']', r4.text, re.IGNORECASE)
if not docx_urls:
print(f" wki_id={wki_id}: DOCX not found in page, trying next")
continue
docx_url = docx_urls[0]
dl = session.get(docx_url, headers={"Referer": r4.url}, verify=False, timeout=60)
filename = docx_url.split("/")[-1]
tmp_path = f"/tmp/{filename}"
with open(tmp_path, "wb") as f:
f.write(dl.content)
print(f" wki_id={wki_id}: success")
return tmp_path
return f"Specification {doc_id}: all {len(candidates)} wki_id candidate(s) rejected by ETSI portal"