DocFinder / classes.py
heymenn's picture
fix etsi connexion cookie expire
8f93b05
raw
history blame
13.7 kB
from fastapi import HTTPException
import requests
import re
from bs4 import BeautifulSoup
import os
import json
def _get_proxies() -> dict:
"""Return a requests-compatible proxies dict from $http_proxy / $HTTP_PROXY."""
proxy = os.environ.get("http_proxy") or os.environ.get("HTTP_PROXY") or ""
if not proxy:
return {}
return {"http": proxy, "https": proxy}
class ETSIDocFinder:
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}
def __init__(self):
self.main_ftp_url = "https://docbox.etsi.org/SET"
req_data = self.connect()
print(req_data['message'])
self.session = req_data['session']
def connect(self):
session = requests.Session()
session.headers.update(self.HEADERS)
session.proxies.update(_get_proxies())
# Seed DNN session cookies — docbox requires the portal session to be
# initialised with domain=docbox.etsi.org so the .DOTNETNUKE cookie
# is scoped to .etsi.org and accepted by docbox.etsi.org as well.
login_redir_url = (
"https://portal.etsi.org/LoginRedirection.aspx"
"?domain=docbox.etsi.org&ReturnUrl=/"
)
session.get(login_redir_url, verify=False, timeout=15)
req = session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
data=json.dumps({"username": os.environ.get("EOL_USER"),
"password": os.environ.get("EOL_PASSWORD")}),
headers={"Content-Type": "application/json; charset=UTF-8",
"Referer": login_redir_url},
verify=False,
allow_redirects=False,
timeout=15,
)
if req.text == "Failed":
return {"error": True, "session": session, "message": "Login failed ! Check your credentials"}
# Always update self.session so reconnect and reauth actually take effect
self.session = session
return {"error": False, "session": session, "message": "Login successful"}
def download_document(self, url: str) -> bytes:
"""Download a docbox file using the authenticated session.
If the session has expired the portal redirects to LoginRedirection —
we detect this and re-authenticate before retrying.
"""
resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True)
# Detect auth redirect (portal login page returned instead of file)
if resp.url and "LoginRedirection" in resp.url:
self.connect() # connect() now updates self.session
resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True)
return resp.content
def get_workgroup(self, doc: str):
main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None
if main_tsg is None:
return None, None, None
regex = re.search(r'\(([^)]+)\)', doc)
workgroup = "20" + regex.group(1)
return main_tsg, workgroup, doc
def find_workgroup_url(self, main_tsg, workgroup):
url = f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS"
response = self.session.get(url, verify=False, timeout=15)
# If docbox redirected to the portal login page, reauth and retry once
if "LoginRedirection" in response.url:
self.connect()
response = self.session.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, 'html.parser')
for item in soup.find_all("tr"):
link = item.find("a")
if link and workgroup in link.get_text():
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}"
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}"
def get_docs_from_url(self, url):
try:
response = self.session.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, "html.parser")
return [item.get_text() for item in soup.select("tr td a")]
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def search_document(self, doc_id: str):
original = doc_id
main_tsg, workgroup, doc = self.get_workgroup(doc_id)
urls = []
if main_tsg:
wg_url = self.find_workgroup_url(main_tsg, workgroup)
print(wg_url)
if wg_url:
entries = self.get_docs_from_url(wg_url)
print(entries)
for entry in entries:
if doc in entry.lower() or original in entry:
doc_url = f"{wg_url}/{entry}"
urls.append(doc_url)
elif "." not in entry.rstrip("/"):
# looks like a subdirectory — go one level deeper
sub_url = f"{wg_url}/{entry}"
files = self.get_docs_from_url(sub_url)
for f in files:
if doc in f.lower() or original in f:
print(f)
urls.append(f"{sub_url}/{f}")
return urls[0] if len(urls) == 1 else urls[-1] if len(urls) > 1 else f"Document {doc_id} not found"
class ETSISpecFinder:
def __init__(self):
self.main_url = "https://www.etsi.org/deliver/etsi_ts"
self.second_url = "https://www.etsi.org/deliver/etsi_tr"
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}
def get_spec_path(self, doc_id: str):
if "-" in doc_id:
position, part = doc_id.split("-")
else:
position, part = doc_id, None
position = position.replace(" ", "")
if part:
if len(part) == 1:
part = "0" + part
spec_folder = position + part if part is not None else position
return f"{int(position) - (int(position)%100)}_{int(position) - (int(position)%100) + 99}/{spec_folder}"
def get_docs_from_url(self, url):
try:
response = requests.get(url, verify=False, timeout=15, proxies=_get_proxies())
soup = BeautifulSoup(response.text, "html.parser")
docs = [item.get_text() for item in soup.find_all("a")][1:]
return docs
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def _normalise_version(self, version: str) -> str:
"""Normalise a user-supplied version string to ETSI zero-padded format.
'17.6.0' -> '17.06.00' (the '_60' release suffix is ignored during matching)
Already-normalised strings like '17.06.00' are returned unchanged."""
parts = version.strip("/").split(".")
if len(parts) == 3:
try:
return f"{int(parts[0]):02d}.{int(parts[1]):02d}.{int(parts[2]):02d}"
except ValueError:
pass
return version.strip("/")
def _pick_release(self, releases: list, version: str = None) -> str:
"""Return the release folder matching version, or the latest if not found/specified."""
if version:
target = self._normalise_version(version)
for r in releases:
# folder names are like '17.06.00_60'; match on the part before '_'
folder = r.strip("/").split("_")[0]
if folder == target:
return r
return releases[-1]
def search_document(self, doc_id: str, version: str = None):
# Example : 103 666[-2 opt]
original = doc_id
url = f"{self.main_url}/{self.get_spec_path(original)}/"
url2 = f"{self.second_url}/{self.get_spec_path(original)}/"
print(url)
print(url2)
releases = self.get_docs_from_url(url)
if releases:
release = self._pick_release(releases, version)
files = self.get_docs_from_url(url + release)
for f in files:
if f.endswith(".pdf"):
return url + release + "/" + f
releases = self.get_docs_from_url(url2)
if releases:
release = self._pick_release(releases, version)
files = self.get_docs_from_url(url2 + release)
for f in files:
if f.endswith(".pdf"):
return url2 + release + "/" + f
return f"Specification {doc_id} not found"
def _get_wki_id(self, doc_id: str, version: str = None) -> str:
"""Return the ETSI portal wki_id for a spec version, or None if not found."""
if version:
version_str = version
else:
# Derive version from the FTP PDF URL
pdf_url = self.search_document(doc_id)
if "not found" in pdf_url.lower():
return None
# URL path: .../18.04.00_60/ts_...p.pdf → folder is parts[-2]
parts = pdf_url.rstrip("/").split("/")
version_folder = parts[-2] # e.g. "18.04.00_60"
v_parts = version_folder.split("_")[0].split(".") # ["18", "04", "00"]
try:
version_str = f"{int(v_parts[0])}.{int(v_parts[1])}.{int(v_parts[2])}"
except (ValueError, IndexError):
return None
for spec_type in ["TS", "TR"]:
params = {
"option": "com_standardssearch",
"view": "data",
"format": "json",
"page": "1",
"search": f"ETSI {spec_type} {doc_id} v{version_str}",
"etsiNumber": "1",
"published": "1",
}
try:
resp = requests.get("https://www.etsi.org/", params=params,
headers=self.headers, verify=False, timeout=15,
proxies=_get_proxies())
data = resp.json()
if data and isinstance(data, list):
return str(data[0]["wki_id"])
except Exception as e:
print(f"Error getting wki_id for {doc_id}: {e}")
return None
def _authenticate_eol(self, wki_id: str) -> requests.Session:
"""Create a requests.Session authenticated to the ETSI EOL portal."""
session = requests.Session()
session.headers.update({"User-Agent": self.headers["User-Agent"]})
session.proxies.update(_get_proxies())
login_redir_url = (
f"https://portal.etsi.org/LoginRedirection.aspx"
f"?ReturnUrl=%2fwebapp%2fprotect%2fNTaccount.asp%3fWki_Id%3d{wki_id}"
f"&Wki_Id={wki_id}"
)
# Seed DNN session cookies
session.get(login_redir_url, verify=False, timeout=15)
# Authenticate via EOL JSON login
session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
data=json.dumps({"username": os.environ.get("EOL_USER"),
"password": os.environ.get("EOL_PASSWORD")}),
headers={"Content-Type": "application/json; charset=UTF-8",
"Referer": login_redir_url},
verify=False,
allow_redirects=False,
timeout=15,
)
return session
def search_document_docx(self, doc_id: str, version: str = None) -> str:
"""Download an ETSI spec as DOCX and return the local file path."""
wki_id = self._get_wki_id(doc_id, version)
if not wki_id:
return f"Specification {doc_id} not found"
session = self._authenticate_eol(wki_id)
# NTaccount.asp → parse profile_id from meta-refresh
r = session.get(
f"https://portal.etsi.org/webapp/protect/NTaccount.asp?Wki_Id={wki_id}",
verify=False, timeout=15,
)
meta_match = re.search(r'URL=([^"\'\s>]+)', r.text)
if not meta_match:
return f"Specification {doc_id}: authentication failed"
meta_url = meta_match.group(1)
if not meta_url.startswith("http"):
meta_url = f"https://portal.etsi.org/webapp/protect/{meta_url}"
# CheckIdentifier → 302 to copy_file
r2 = session.get(meta_url, allow_redirects=False, verify=False, timeout=15)
if r2.status_code != 302:
return f"Specification {doc_id}: download chain failed"
# copy_file (may have a second redirect)
copy_url = "https://portal.etsi.org" + r2.headers["Location"]
r3 = session.get(copy_url, allow_redirects=False, verify=False, timeout=15)
if r3.status_code == 302:
final_url = "https://portal.etsi.org/webapp/ewp/" + r3.headers["Location"]
r4 = session.get(final_url, verify=False, timeout=15)
else:
r4 = r3
# Extract DOCX link
docx_urls = re.findall(r'href=["\']([^"\']*\.docx)["\']', r4.text, re.IGNORECASE)
if not docx_urls:
return f"Specification {doc_id}: DOCX not available"
docx_url = docx_urls[0]
# Download
dl = session.get(docx_url, headers={"Referer": r4.url}, verify=False, timeout=60)
filename = docx_url.split("/")[-1]
tmp_path = f"/tmp/{filename}"
with open(tmp_path, "wb") as f:
f.write(dl.content)
return tmp_path