| | import traceback |
| | from fastapi import FastAPI, BackgroundTasks |
| | from schemas import * |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from fastapi.responses import FileResponse |
| | from litellm.router import Router |
| | from aiolimiter import AsyncLimiter |
| | import pandas as pd |
| | import asyncio |
| | import re |
| | import nltk |
| |
|
| | nltk.download('stopwords') |
| | nltk.download('punkt_tab') |
| | nltk.download('wordnet') |
| |
|
| | from nltk.stem import WordNetLemmatizer |
| | from nltk.corpus import stopwords |
| | from nltk.tokenize import word_tokenize |
| |
|
| | import string |
| | import subprocess |
| | import requests |
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv() |
| |
|
| | import os |
| | from lxml import etree |
| | import zipfile |
| | import io |
| | import warnings |
| |
|
| | warnings.filterwarnings("ignore") |
| |
|
| | from bs4 import BeautifulSoup |
| |
|
| | app = FastAPI(title="Requirements Extractor") |
| | app.add_middleware(CORSMiddleware, allow_credentials=True, allow_headers=["*"], allow_methods=["*"], allow_origins=["*"]) |
| | llm_router = Router(model_list=[{"model_name": "gemini-v1", "litellm_params": {"model": "gemini/gemini-2.0-flash", "api_key": os.environ.get("GEMINI"), "max_retries": 10, "rpm": 15}}, |
| | {"model_name": "gemini-v2", "litellm_params": {"model": "gemini/gemini-2.5-flash", "api_key": os.environ.get("GEMINI"), "max_retries": 10, "rpm": 10}}] |
| | , fallbacks=[{"gemini-v2": ["gemini-v1"]}], num_retries=10) |
| |
|
| | limiter_mapping = { |
| | model["model_name"]: AsyncLimiter(model["litellm_params"]["rpm"], 60) |
| | for model in llm_router.model_list |
| | } |
| | lemmatizer = WordNetLemmatizer() |
| |
|
| | NSMAP = { |
| | 'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main', |
| | 'v': 'urn:schemas-microsoft-com:vml' |
| | } |
| |
|
| | def lemma(text: str): |
| | stop_words = set(stopwords.words('english')) |
| | txt = text.translate(str.maketrans('', '', string.punctuation)).strip() |
| | tokens = [token for token in word_tokenize(txt.lower()) if token not in stop_words] |
| | return [lemmatizer.lemmatize(token) for token in tokens] |
| |
|
| | def get_docx_archive(url: str) -> zipfile.ZipFile: |
| | """Récupère le docx depuis l'URL et le retourne comme objet ZipFile""" |
| | if not url.endswith("zip"): |
| | raise ValueError("URL doit pointer vers un fichier ZIP") |
| | doc_id = os.path.splitext(os.path.basename(url))[0] |
| | resp = requests.get(url, verify=False, headers={ |
| | "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
| | }) |
| | resp.raise_for_status() |
| |
|
| | with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: |
| | for file_name in zf.namelist(): |
| | if file_name.endswith(".docx"): |
| | docx_bytes = zf.read(file_name) |
| | return zipfile.ZipFile(io.BytesIO(docx_bytes)) |
| | elif file_name.endswith(".doc"): |
| | input_path = f"/tmp/{doc_id}.doc" |
| | output_path = f"/tmp/{doc_id}.docx" |
| | docx_bytes = zf.read(file_name) |
| |
|
| | with open(input_path, "wb") as f: |
| | f.write(docx_bytes) |
| | |
| | subprocess.run([ |
| | "libreoffice", |
| | "--headless", |
| | "--convert-to", "docx", |
| | "--outdir", "/tmp", |
| | input_path |
| | ], check=True) |
| |
|
| | with open(output_path, "rb") as f: |
| | docx_bytes = f.read() |
| |
|
| | os.remove(input_path) |
| | os.remove(output_path) |
| | |
| | return zipfile.ZipFile(io.BytesIO(docx_bytes)) |
| |
|
| | raise ValueError("Aucun fichier docx/doc trouvé dans l'archive") |
| |
|
| | def parse_document_xml(docx_zip: zipfile.ZipFile) -> etree._ElementTree: |
| | """Parse le document.xml principal""" |
| | xml_bytes = docx_zip.read('word/document.xml') |
| | parser = etree.XMLParser(remove_blank_text=True) |
| | return etree.fromstring(xml_bytes, parser=parser) |
| |
|
| | def clean_document_xml(root: etree._Element) -> None: |
| | """Nettoie le XML en modifiant l'arbre directement""" |
| | |
| | for del_elem in root.xpath('//w:del', namespaces=NSMAP): |
| | parent = del_elem.getparent() |
| | if parent is not None: |
| | parent.remove(del_elem) |
| | |
| | |
| | for ins_elem in root.xpath('//w:ins', namespaces=NSMAP): |
| | parent = ins_elem.getparent() |
| | index = parent.index(ins_elem) |
| | for child in ins_elem.iterchildren(): |
| | parent.insert(index, child) |
| | index += 1 |
| | parent.remove(ins_elem) |
| | |
| | |
| | for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']: |
| | for elem in root.xpath(f'//{tag}', namespaces=NSMAP): |
| | parent = elem.getparent() |
| | if parent is not None: |
| | parent.remove(elem) |
| |
|
| | def create_modified_docx(original_zip: zipfile.ZipFile, modified_root: etree._Element) -> bytes: |
| | """Crée un nouveau docx avec le XML modifié""" |
| | output = io.BytesIO() |
| | |
| | with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip: |
| | |
| | for file in original_zip.infolist(): |
| | if file.filename != 'word/document.xml': |
| | new_zip.writestr(file, original_zip.read(file.filename)) |
| | |
| | |
| | xml_str = etree.tostring( |
| | modified_root, |
| | xml_declaration=True, |
| | encoding='UTF-8', |
| | pretty_print=True |
| | ) |
| | new_zip.writestr('word/document.xml', xml_str) |
| | |
| | output.seek(0) |
| | return output.getvalue() |
| |
|
| | def docx_to_txt(doc_id: str, url: str): |
| | docx_zip = get_docx_archive(url) |
| | root = parse_document_xml(docx_zip) |
| | clean_document_xml(root) |
| | modified_bytes = create_modified_docx(docx_zip, root) |
| |
|
| | input_path = f"/tmp/{doc_id}_cleaned.docx" |
| | output_path = f"/tmp/{doc_id}_cleaned.txt" |
| | with open(input_path, "wb") as f: |
| | f.write(modified_bytes) |
| | |
| | subprocess.run([ |
| | "libreoffice", |
| | "--headless", |
| | "--convert-to", "txt", |
| | "--outdir", "/tmp", |
| | input_path |
| | ], check=True) |
| |
|
| | with open(output_path, "r", encoding="utf-8") as f: |
| | txt_data = [line.strip() for line in f if line.strip()] |
| |
|
| | os.remove(input_path) |
| | os.remove(output_path) |
| | return txt_data |
| |
|
| | @app.get("/") |
| | def render_page(): |
| | return FileResponse("index.html") |
| |
|
| | @app.post("/get_meetings", response_model=MeetingsResponse) |
| | def get_meetings(req: MeetingsRequest): |
| | working_group = req.working_group |
| | tsg = re.sub(r"\d+", "", working_group) |
| | wg_number = re.search(r"\d", working_group).group(0) |
| | url = "https://www.3gpp.org/ftp/tsg_" + tsg |
| | resp = requests.get(url, verify=False) |
| | soup = BeautifulSoup(resp.text, "html.parser") |
| | meeting_folders = [] |
| | all_meetings = [] |
| | wg_folders = [item.get_text() for item in soup.select("tr td a")] |
| | selected_folder = None |
| | for folder in wg_folders: |
| | if str(wg_number) in folder: |
| | selected_folder = folder |
| | break |
| |
|
| | url += "/" + selected_folder |
| |
|
| | if selected_folder: |
| | resp = requests.get(url, verify=False) |
| | soup = BeautifulSoup(resp.text, "html.parser") |
| | meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text().startswith("TSG")] |
| | all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace("-", " ") for meeting in meeting_folders] |
| | |
| | return MeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders))) |
| |
|
| | @app.post("/get_dataframe", response_model=DataResponse) |
| | def get_change_request_dataframe(req: DataRequest): |
| | working_group = req.working_group |
| | tsg = re.sub(r"\d+", "", working_group) |
| | wg_number = re.search(r"\d", working_group).group(0) |
| | url = "https://www.3gpp.org/ftp/tsg_" + tsg |
| | resp = requests.get(url, verify=False) |
| | soup = BeautifulSoup(resp.text, "html.parser") |
| | wg_folders = [item.get_text() for item in soup.select("tr td a")] |
| | selected_folder = None |
| | for folder in wg_folders: |
| | if str(wg_number) in folder: |
| | selected_folder = folder |
| | break |
| |
|
| | url += "/" + selected_folder + "/" + req.meeting + "/docs" |
| | resp = requests.get(url, verify=False) |
| | soup = BeautifulSoup(resp.text, "html.parser") |
| | files = [item.get_text() for item in soup.select("tr td a") if item.get_text().endswith(".xlsx")] |
| |
|
| | def gen_url(tdoc: str): |
| | return f"{url}/{tdoc}.zip" |
| |
|
| | df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23")) |
| | filtered_df = df[(((df["Type"] == "CR") & ((df["CR category"] == "B") | (df["CR category"] == "C"))) | (df["Type"] == "pCR")) & ~(df["Uploaded"].isna())][["TDoc", "Title", "CR category", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]] |
| | filtered_df["URL"] = filtered_df["TDoc"].apply(gen_url) |
| |
|
| | df = filtered_df.fillna("") |
| | return DataResponse(data=df[["TDoc", "Title", "Type", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records")) |
| |
|
| | @app.post("/generate_requirements", response_model=RequirementsResponse) |
| | async def gen_reqs(req: RequirementsRequest, background_tasks: BackgroundTasks): |
| | documents = req.documents |
| | n_docs = len(documents) |
| | |
| | async def process_document(doc): |
| | doc_id = doc.document |
| | url = doc.url |
| | try: |
| | full = "\n".join(docx_to_txt(doc_id, url)) |
| | except Exception as e: |
| | traceback.print_exception(e) |
| | return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements |
| | |
| | try: |
| | model_used = "gemini-v2" |
| | async with limiter_mapping[model_used]: |
| | resp_ai = await llm_router.acompletion( |
| | model=model_used, |
| | messages=[{"role":"user","content": f"Here's the document whose ID is {doc_id} : {full}\n\nExtract all requirements and group them by context, returning a list of objects where each object includes a document ID, a concise description of the context where the requirements apply (not a chapter title or copied text), and a list of associated requirements; always return the result as a list, even if only one context is found."}], |
| | response_format=RequirementsResponse |
| | ) |
| | return RequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements |
| | except Exception as e: |
| | if "rate limit" in str(e).lower(): |
| | try: |
| | model_used = "gemini-v2" |
| | async with limiter_mapping[model_used]: |
| | resp_ai = await llm_router.acompletion( |
| | model=model_used, |
| | messages=[{"role":"user","content": f"Here's the document whose ID is {doc_id} : {full}\n\nExtract all requirements and group them by context, returning a list of objects where each object includes a document ID, a concise description of the context where the requirements apply (not a chapter title or copied text), and a list of associated requirements; always return the result as a list, even if only one context is found."}], |
| | response_format=RequirementsResponse |
| | ) |
| | return RequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements |
| | except Exception as fallback_e: |
| | traceback.print_exception(fallback_e) |
| | return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements |
| | else: |
| | traceback.print_exception(e) |
| | return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements |
| | |
| | async def process_batch(batch): |
| | results = await asyncio.gather(*(process_document(doc) for doc in batch)) |
| | return [item for sublist in results for item in sublist] |
| | |
| | all_requirements = [] |
| | |
| | if n_docs <= 30: |
| | batch_results = await process_batch(documents) |
| | all_requirements.extend(batch_results) |
| | else: |
| | batch_size = 30 |
| | batches = [documents[i:i + batch_size] for i in range(0, n_docs, batch_size)] |
| | |
| | for i, batch in enumerate(batches): |
| | batch_results = await process_batch(batch) |
| | all_requirements.extend(batch_results) |
| | |
| | if i < len(batches) - 1: |
| | background_tasks.add_task(asyncio.sleep, 60) |
| | return RequirementsResponse(requirements=all_requirements) |
| |
|
| | @app.post("/get_reqs_from_query", response_model=ReqSearchResponse) |
| | def find_requirements_from_problem_description(req: ReqSearchRequest): |
| | requirements = req.requirements |
| | query = req.query |
| |
|
| | requirements_text = "\n".join([f"[Document: {r.document} | Context: {r.context} | Requirement: {r.requirement}]" for r in requirements]) |
| | |
| | print("Called the LLM") |
| | resp_ai = llm_router.completion( |
| | model="gemini-v2", |
| | messages=[{"role":"user","content": f"Given all the requirements : \n {requirements_text} \n and the problem description \"{query}\", return a list of objects each with document ID, context, and requirement for the most relevant requirements that reference or best cover the problem."}], |
| | response_format=ReqSearchResponse |
| | ) |
| | print("Answered") |
| |
|
| | return ReqSearchResponse.model_validate_json(resp_ai.choices[0].message.content) |