| | import asyncio |
| | from aiolimiter import AsyncLimiter |
| | from pathlib import Path |
| | import traceback |
| | from typing import Literal, Tuple |
| | from fastapi.routing import APIRouter |
| | import logging |
| | import io |
| | import zipfile |
| | import os |
| | from httpx import AsyncClient |
| | from pydantic import BaseModel |
| | import subprocess |
| | import pandas as pd |
| | import re |
| | import tempfile |
| | from lxml import etree |
| | from bs4 import BeautifulSoup |
| | from fastapi import Depends, HTTPException |
| | from dependencies import get_http_client, get_llm_router |
| | from fastapi.responses import StreamingResponse |
| | from litellm.router import Router |
| | from kreuzberg import ExtractionConfig, extract_bytes |
| |
|
| | from schemas import GetMeetingDocsRequest, GetMeetingDocsResponse, DocRequirements, DownloadDocsRequest, GetMeetingsRequest, GetMeetingsResponse, ExtractRequirementsRequest, ExtractRequirementsResponse |
| |
|
| | |
| | router = APIRouter(tags=["document extraction"]) |
| |
|
| | |
| | NSMAP = { |
| | 'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main', |
| | 'v': 'urn:schemas-microsoft-com:vml' |
| | } |
| |
|
| | |
| |
|
| | KREUZBERG_CONFIG: ExtractionConfig = ExtractionConfig( |
| | force_ocr=False, ocr_backend=None) |
| |
|
| | |
| | LO_CONVERSION_MUTEX = asyncio.Semaphore(1) |
| |
|
| |
|
| | async def convert_file(contents: io.BytesIO, filename: str, input_ext: str, output_ext: str, filter: str = None) -> io.BytesIO: |
| | """ |
| | Converts the given file bytes using Libreoffice headless to the specified file type. |
| | This is an asynchronous version. |
| | |
| | Args: |
| | contents: File contents |
| | filename: File base name WITHOUT THE EXTENSION |
| | input_ext: Input extension (WITHOUT THE DOT) |
| | output_ext: Output extension (WITHOUT THE DOT) |
| | filter: The conversion filter to use. |
| | """ |
| |
|
| | await LO_CONVERSION_MUTEX.acquire() |
| |
|
| | with tempfile.TemporaryDirectory() as tmpdir: |
| | dir_path = Path(tmpdir) |
| | input_file_path = dir_path / f"{filename}.{input_ext}" |
| | output_file_path = dir_path / f"{filename}.{output_ext}" |
| |
|
| | |
| | with open(input_file_path, "wb") as in_file: |
| | in_file.write(contents.read()) |
| |
|
| | out_bytes = io.BytesIO() |
| |
|
| | |
| | command = [ |
| | "libreoffice", |
| | "--headless", |
| | "--convert-to", f"{output_ext}:{filter}" if filter else output_ext, |
| | "--outdir", tmpdir, |
| | str(input_file_path) |
| | ] |
| |
|
| | |
| | process = await asyncio.create_subprocess_exec( |
| | *command, |
| | stdout=asyncio.subprocess.PIPE, |
| | stderr=asyncio.subprocess.PIPE |
| | ) |
| |
|
| | stdout, stderr = await process.communicate() |
| |
|
| | exit_code = await process.wait() |
| |
|
| | if exit_code != 0 and not output_file_path.exists(): |
| | raise subprocess.CalledProcessError( |
| | exit_code, |
| | command, |
| | output=stdout, |
| | stderr=stderr |
| | ) |
| |
|
| | LO_CONVERSION_MUTEX.release() |
| |
|
| | with open(output_file_path, mode="rb") as out: |
| | out_bytes.write(out.read()) |
| |
|
| | out_bytes.seek(0) |
| | return out_bytes |
| |
|
| |
|
| | |
| | FTP_DOWNLOAD_RATE_LIMITER = AsyncLimiter(max_rate=60, time_period=60) |
| | |
| | FTP_MAX_PARALLEL_WORKERS = asyncio.Semaphore(4) |
| |
|
| |
|
| | async def get_doc_archive(url: str, client: AsyncClient) -> tuple[str, str, io.BytesIO]: |
| | """Récupère le docx depuis l'URL et le retourne un tuple (nom, extension, contenu)""" |
| |
|
| | async with FTP_DOWNLOAD_RATE_LIMITER: |
| | async with FTP_MAX_PARALLEL_WORKERS: |
| | if not url.endswith("zip"): |
| | raise ValueError("URL doit pointer vers un fichier ZIP") |
| |
|
| | doc_id = os.path.splitext(os.path.basename(url))[0] |
| | resp = await client.get(url, headers={ |
| | "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
| | }) |
| |
|
| | resp.raise_for_status() |
| |
|
| | with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: |
| | |
| | for entry in zf.infolist(): |
| | if entry.is_dir(): |
| | continue |
| |
|
| | file_name = entry.filename |
| | root, ext = os.path.splitext(file_name) |
| | doc_bytes = zf.read(file_name) |
| | return (root, ext.lower(), io.BytesIO(doc_bytes)) |
| |
|
| | raise ValueError("Aucun fichier trouvé dans l'archive") |
| |
|
| |
|
| | def apply_docx_revisions(docx_zip: zipfile.ZipFile) -> io.BytesIO: |
| | """ |
| | Applique les révisions des .docx avant de retourner le contenu. |
| | |
| | Args: |
| | docx_zip: Le document word sous forme de zip |
| | """ |
| |
|
| | try: |
| | xml_bytes = docx_zip.read('word/document.xml') |
| | except KeyError: |
| | raise FileNotFoundError( |
| | "word/document.xml not found in the DOCX archive.") |
| |
|
| | parser = etree.XMLParser(remove_blank_text=True) |
| | root = etree.fromstring(xml_bytes, parser=parser) |
| |
|
| | |
| | for del_elem in root.xpath('//w:del', namespaces=NSMAP): |
| | parent = del_elem.getparent() |
| | if parent is not None: |
| | parent.remove(del_elem) |
| |
|
| | |
| | for ins_elem in root.xpath('//w:ins', namespaces=NSMAP): |
| | parent = ins_elem.getparent() |
| | if parent is not None: |
| | index = parent.index(ins_elem) |
| | for child in ins_elem.iterchildren(): |
| | parent.insert(index, child) |
| | index += 1 |
| | parent.remove(ins_elem) |
| |
|
| | |
| | for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']: |
| | for elem in root.xpath(f'//{tag}', namespaces=NSMAP): |
| | parent = elem.getparent() |
| | if parent is not None: |
| | parent.remove(elem) |
| |
|
| | |
| | output = io.BytesIO() |
| |
|
| | with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip: |
| | |
| | for file_info in docx_zip.infolist(): |
| | if file_info.filename != 'word/document.xml': |
| | new_zip.writestr(file_info, docx_zip.read(file_info.filename)) |
| |
|
| | |
| | xml_str = etree.tostring( |
| | root, |
| | xml_declaration=True, |
| | encoding='UTF-8', |
| | pretty_print=True |
| | ) |
| | new_zip.writestr('word/document.xml', xml_str) |
| |
|
| | output.seek(0) |
| | return output |
| |
|
| |
|
| | FORMAT_MIME_TYPES = { |
| | ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", |
| | ".pdf": "application/pdf", |
| | ".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation" |
| | } |
| |
|
| |
|
| | async def doc_to_txt(doc_id: str, url: str, client: AsyncClient) -> str: |
| | """ |
| | Télécharge le TDoc spécifié et le convertit en texte. |
| | """ |
| |
|
| | |
| | filename, ext, bytes = await get_doc_archive(url, client) |
| |
|
| | final_text: str = None |
| | if ext == ".doc": |
| | logging.debug(f"Converting {filename} .doc --> .docx") |
| | docx_bytes = await convert_file(bytes, doc_id, "doc", "docx") |
| | extracted_data = await extract_bytes(docx_bytes.read(), FORMAT_MIME_TYPES[".docx"], config=KREUZBERG_CONFIG) |
| | final_text = extracted_data.content |
| | elif ext == ".docx": |
| | |
| | logging.debug(f"Updating .docx revisions for {doc_id}.") |
| | applied_revision = apply_docx_revisions(zipfile.ZipFile(bytes)) |
| | extracted_data = await extract_bytes(applied_revision.read(), FORMAT_MIME_TYPES[".docx"], config=KREUZBERG_CONFIG) |
| | final_text = extracted_data.content |
| | else: |
| | if ext in FORMAT_MIME_TYPES: |
| | extracted_data = await extract_bytes(bytes.read(), FORMAT_MIME_TYPES[ext], config=KREUZBERG_CONFIG) |
| | final_text = extracted_data.content |
| | else: |
| | raise Exception( |
| | f"Unsupported file type: {ext}, filename: {filename}") |
| |
|
| | txt_data = [line.strip() |
| | for line in final_text.splitlines() if line.strip()] |
| |
|
| | return txt_data |
| |
|
| |
|
| | |
| |
|
| | @router.post("/get_meetings", response_model=GetMeetingsResponse) |
| | async def get_meetings(req: GetMeetingsRequest, http_client: AsyncClient = Depends(get_http_client)): |
| | """ |
| | Retrieves the list of meetings for the given working group. |
| | """ |
| | |
| | working_group = req.working_group |
| | tsg = re.sub(r"\d+", "", working_group) |
| | wg_number = re.search(r"\d", working_group).group(0) |
| |
|
| | |
| | logging.debug(tsg, wg_number) |
| | url = "https://www.3gpp.org/ftp/tsg_" + tsg |
| | logging.debug(url) |
| |
|
| | ftp_request = await http_client.get(url) |
| | soup = BeautifulSoup(ftp_request.text, "html.parser") |
| |
|
| | meeting_folders = [] |
| | all_meetings = [] |
| | wg_folders = [item.get_text() for item in soup.select("tr td a")] |
| | selected_folder = None |
| |
|
| | |
| | for folder in wg_folders: |
| | if "wg" + str(wg_number) in folder.lower(): |
| | selected_folder = folder |
| | break |
| |
|
| | url += "/" + selected_folder |
| | logging.debug(url) |
| |
|
| | if selected_folder: |
| | resp = await http_client.get(url) |
| | soup = BeautifulSoup(resp.text, "html.parser") |
| | meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text( |
| | ).startswith("TSG") or (item.get_text().startswith("CT") and "-" in item.get_text())] |
| | all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace( |
| | "-", " ") if meeting.startswith('TSG') else meeting.replace("-", "#") for meeting in meeting_folders] |
| |
|
| | return GetMeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders))) |
| |
|
| | |
| |
|
| |
|
| | @router.post("/get_meeting_docs", response_model=GetMeetingDocsResponse) |
| | async def get_meeting_docs(req: GetMeetingDocsRequest, http_client: AsyncClient = Depends(get_http_client)) -> GetMeetingDocsResponse: |
| | """ |
| | Downloads the document list dataframe for a given meeting |
| | """ |
| |
|
| | |
| |
|
| | |
| | working_group = req.working_group |
| | tsg = re.sub(r"\d+", "", working_group) |
| | wg_number = re.search(r"\d", working_group).group(0) |
| | url = "https://www.3gpp.org/ftp/tsg_" + tsg |
| | logging.info("Fetching TDocs dataframe") |
| |
|
| | resp = await http_client.get(url) |
| | soup = BeautifulSoup(resp.text, "html.parser") |
| | wg_folders = [item.get_text() for item in soup.select("tr td a")] |
| | selected_folder = None |
| | for folder in wg_folders: |
| | if "wg" + str(wg_number) in folder.lower(): |
| | selected_folder = folder |
| | break |
| |
|
| | url += "/" + selected_folder + "/" + req.meeting + "/docs" |
| | resp = await http_client.get(url) |
| | soup = BeautifulSoup(resp.text, "html.parser") |
| | files = [item.get_text() for item in soup.select("tr td a") |
| | if item.get_text().endswith(".xlsx")] |
| |
|
| | if files == []: |
| | raise HTTPException(status_code=404, detail="No XLSX has been found") |
| |
|
| | df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23")) |
| | filtered_df = df[~( |
| | df["Uploaded"].isna())][["TDoc", "Title", "CR category", "For", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]] |
| | filtered_df["URL"] = filtered_df["TDoc"].apply( |
| | lambda tdoc: f"{url}/{tdoc}.zip") |
| |
|
| | df = filtered_df.fillna("") |
| | return GetMeetingDocsResponse(data=df[["TDoc", "Title", "Type", "For", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records")) |
| |
|
| | |
| |
|
| |
|
| | @router.post("/download_docs") |
| | async def download_docs(req: DownloadDocsRequest, http_client: AsyncClient = Depends(get_http_client)) -> StreamingResponse: |
| | """Download the specified TDocs and zips them in a single archive""" |
| |
|
| | |
| | document_ids = [doc.document for doc in req.documents] |
| |
|
| | logging.info(f"Downloading TDocs: {document_ids}") |
| |
|
| | async def _process_single_document(doc_id: str, doc_url: str) -> Tuple[bool, bytes]: |
| | """Attempts to convert a document to text and returns success status and content.""" |
| | try: |
| | text_lines = await doc_to_txt(doc_id, doc_url, http_client) |
| | content_bytes = "\n".join(text_lines).encode("utf-8") |
| | return {"doc_id": doc_id, "content": content_bytes} |
| | except Exception as e: |
| | logging.warning( |
| | f"Failed to process document '{doc_id}' from URL '{doc_url}': {e}") |
| | error_message = f"Document '{doc_id}' text extraction failed: {e}".encode( |
| | "utf-8") |
| | return {"doc_id": doc_id, "content": error_message, "failed": True} |
| |
|
| | convert_tasks = await asyncio.gather(*[_process_single_document(doc.document, doc.url) for doc in req.documents], return_exceptions=False) |
| |
|
| | zip_buffer = io.BytesIO() |
| | with zipfile.ZipFile(zip_buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file: |
| | for task in convert_tasks: |
| | failed = "failed" in task |
| | doc_id = task["doc_id"] |
| | safe_filename = f"failed_{doc_id}.txt" if failed else f"{doc_id}.txt" |
| | zip_file.writestr(safe_filename, task["content"]) |
| |
|
| | zip_buffer.seek(0) |
| |
|
| | return StreamingResponse( |
| | zip_buffer, |
| | media_type="application/zip", |
| | headers={"Content-Disposition": "attachment; filename=tdocs.zip"} |
| | ) |
| |
|
| | |
| |
|
| |
|
| | class ProgressUpdate(BaseModel): |
| | """Defines the structure of a single SSE message.""" |
| | status: Literal["progress", "complete"] |
| | data: dict |
| | total_docs: int |
| | processed_docs: int |
| |
|
| |
|
| | @router.post("/extract_requirements/sse") |
| | async def extract_requirements_from_docs(req: ExtractRequirementsRequest, llm_router: Router = Depends(get_llm_router), http_client: AsyncClient = Depends(get_http_client)): |
| | """Extract requirements from the specified xxxxCR docs using a LLM and returns SSE events about the progress of ongoing operations""" |
| |
|
| | documents = req.documents |
| | n_docs = len(documents) |
| |
|
| | logging.info( |
| | "Generating requirements for documents: {}".format(req.documents)) |
| |
|
| | |
| | concurrency_sema = asyncio.Semaphore(4) |
| |
|
| | def prompt(doc_id, full): |
| | return f"Here's the document whose ID is {doc_id} : {full}\n\nExtract all requirements and group them by context, returning a list of objects where each object includes a document ID, a concise description of the context where the requirements apply (not a chapter title or copied text), and a list of associated requirements; always return the result as a list, even if only one context is found. Remove the errors" |
| |
|
| | async def _process_document(doc) -> list[DocRequirements]: |
| | doc_id = doc.document |
| | url = doc.url |
| |
|
| | |
| | try: |
| | doc = await doc_to_txt(doc_id, url, http_client) |
| | full = "\n".join(doc) |
| | except Exception as e: |
| | fmt = "".join(traceback.format_exception(e)) |
| | logging.error(f"Failed to process doc {doc_id} : {fmt}") |
| | return [DocRequirements(document=doc_id, context="Failed to process document", requirements=[])] |
| |
|
| | try: |
| | await concurrency_sema.acquire() |
| |
|
| | model_used = "gemini-v2" |
| | resp_ai = await llm_router.acompletion( |
| | model=model_used, |
| | messages=[ |
| | {"role": "user", "content": prompt(doc_id, full)}], |
| | response_format=ExtractRequirementsResponse |
| | ) |
| | return ExtractRequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements |
| | except Exception as e: |
| | return [DocRequirements(document=doc_id, context="Error LLM", requirements=[])] |
| | finally: |
| | concurrency_sema.release() |
| |
|
| | |
| | process_futures = [_process_document(doc) for doc in documents] |
| |
|
| | |
| | def progress_update(x): return f"data: {x.model_dump_json()}\n\n" |
| |
|
| | |
| | async def _stream_generator(docs: list[asyncio.Future]): |
| | items = [] |
| | n_processed = 0 |
| |
|
| | yield progress_update(ProgressUpdate(status="progress", data={}, total_docs=n_docs, processed_docs=0)) |
| |
|
| | for doc in asyncio.as_completed(docs): |
| | result = await doc |
| | items.extend(result) |
| | n_processed += 1 |
| | yield progress_update(ProgressUpdate(status="progress", data={}, total_docs=n_docs, processed_docs=n_processed)) |
| |
|
| | final_response = ExtractRequirementsResponse(requirements=items) |
| |
|
| | yield progress_update(ProgressUpdate(status="complete", data=final_response.model_dump(), total_docs=n_docs, processed_docs=n_processed)) |
| |
|
| | return StreamingResponse(_stream_generator(process_futures), media_type="text/event-stream") |
| |
|