from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel import requests from bs4 import BeautifulSoup import zipfile import io import os import subprocess import hashlib from pathlib import Path from typing import Optional import uvicorn app = FastAPI( title="3GPP Document Extractor API", description="API to extract and read 3GPP specification documents from zip archives", version="1.0.0" ) # Pydantic models for request/response class SpecRequest(BaseModel): spec: str use_cache: bool = True class DocumentResponse(BaseModel): spec: str url: str content: str cached: bool content_length: int class LinkResponse(BaseModel): spec: str url: str last_link: str class ErrorResponse(BaseModel): error: str detail: str def get_last_link_from_3gpp_spec(spec: str) -> Optional[str]: """ Fetches the last clickable link from a 3GPP specification page. Args: spec: The specification identifier (e.g., "38.211"). Returns: The last clickable link URL, or None if not found. """ series = spec.split(".")[0] doc_id = spec url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/" try: response = requests.get(url) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Find all anchor tags (links) links = soup.find_all('a') # Filter out links that are just directory traversals or empty clickable_links = [link for link in links if link.get('href') and not link.get('href').startswith('../')] if clickable_links: # Return the href of the last clickable link return clickable_links[-1].get('href') else: return None except requests.exceptions.RequestException as e: print(f"Error fetching the page: {e}") return None def extract_and_read_doc_from_zip_url(url: str, cache_dir: str = "document_cache") -> tuple[Optional[str], bool]: """ Downloads a zip file from a URL, extracts the first .docx or .doc file, reads its content using LibreOffice via subprocess, and returns the text. Uses caching to avoid re-processing the same files. Args: url: The URL of the zip file. cache_dir: Directory to store cached files. Returns: Tuple of (text_content, was_cached) where was_cached indicates if result came from cache. """ try: # Create cache directory if it doesn't exist cache_path = Path(cache_dir) cache_path.mkdir(exist_ok=True) # Create a hash of the URL to use as cache key url_hash = hashlib.md5(url.encode()).hexdigest() # Check if cached text file exists cached_txt_file = cache_path / f"{url_hash}.txt" if cached_txt_file.exists(): print(f"Found cached version for URL: {url}") with open(cached_txt_file, 'r', encoding='utf-8') as f: return f.read(), True print(f"No cache found, processing URL: {url}") # Download the zip file response = requests.get(url, stream=True) response.raise_for_status() # Use a BytesIO object to work with the zip data in memory zip_data = io.BytesIO(response.content) with zipfile.ZipFile(zip_data, 'r') as zip_ref: for file_info in zip_ref.infolist(): filename = file_info.filename if filename.lower().endswith(('.docx', '.doc')): print(f"Found .docx or .doc file: {filename}") # Create a unique filename for the cached document file_extension = os.path.splitext(filename)[1] cached_doc_file = cache_path / f"{url_hash}{file_extension}" # Extract the file to cache directory zip_ref.extract(filename, cache_path) extracted_filepath = cache_path / filename # Move to standardized cache filename extracted_filepath.rename(cached_doc_file) # Use subprocess to call LibreOffice for conversion txt_filename = f"{url_hash}.txt" txt_filepath = cache_path / txt_filename try: # Run LibreOffice conversion using subprocess cmd = [ "libreoffice", "--headless", "--convert-to", "txt", str(cached_doc_file), "--outdir", str(cache_path) ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=60 # 60 second timeout ) if result.returncode != 0: print(f"LibreOffice conversion failed with return code {result.returncode}") print(f"stderr: {result.stderr}") return None, False # The converted file will have the same base name as the original original_base_name = os.path.splitext(os.path.basename(str(cached_doc_file)))[0] converted_txt_file = cache_path / f"{original_base_name}.txt" # Rename to our standardized cache filename if different if converted_txt_file != txt_filepath: if converted_txt_file.exists(): converted_txt_file.rename(txt_filepath) # Read the converted text file if txt_filepath.exists(): with open(txt_filepath, 'r', encoding='utf-8') as txt_file: text_content = txt_file.read() print(f"Successfully processed and cached document from: {url}") return text_content, False else: print(f"Error: Converted text file not found at {txt_filepath}") return None, False except subprocess.TimeoutExpired: print("LibreOffice conversion timed out after 60 seconds") return None, False except FileNotFoundError: print("Error: LibreOffice not found. Please ensure LibreOffice is installed and in your PATH.") return None, False except Exception as e: print(f"Error running LibreOffice conversion: {e}") return None, False print("No .docx or .doc file found in the zip archive.") return None, False except requests.exceptions.RequestException as e: print(f"Error downloading or processing the zip file: {e}") return None, False except zipfile.BadZipFile: print("Error: The downloaded file is not a valid zip file.") return None, False except Exception as e: print(f"An unexpected error occurred: {e}") return None, False # API Endpoints @app.get("/") async def root(): """Root endpoint with API information""" return { "message": "3GPP Document Extractor API", "version": "1.0.0", "endpoints": { "GET /": "API information", "GET /spec/{spec}/link": "Get last link for a 3GPP specification", "POST /extract": "Extract document content from 3GPP specification", "GET /health": "Health check" } } @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "message": "API is running"} @app.get("/spec/{spec}/link", response_model=LinkResponse) async def get_spec_link(spec: str): """ Get the last clickable link for a 3GPP specification. Args: spec: The specification identifier (e.g., "38.211") Returns: LinkResponse with the specification and its last link """ try: last_link = get_last_link_from_3gpp_spec(spec) if not last_link: raise HTTPException( status_code=404, detail=f"No clickable links found for specification {spec}" ) # Construct full URL series = spec.split(".")[0] base_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{spec}/" full_url = base_url + last_link return LinkResponse( spec=spec, url=base_url, last_link=full_url ) except Exception as e: raise HTTPException( status_code=500, detail=f"Error processing specification {spec}: {str(e)}" ) @app.post("/extract", response_model=DocumentResponse) async def extract_document(request: SpecRequest): """ Extract and read document content from a 3GPP specification. Args: request: SpecRequest containing spec identifier and cache preference Returns: DocumentResponse with the extracted content """ try: # First, get the last link last_link = get_last_link_from_3gpp_spec(request.spec) if not last_link: raise HTTPException( status_code=404, detail=f"No clickable links found for specification {request.spec}" ) # Construct full URL series = request.spec.split(".")[0] base_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{request.spec}/" full_url = base_url + last_link # Check if it's a zip file if not full_url.lower().endswith('.zip'): raise HTTPException( status_code=400, detail=f"The last link is not a zip file: {full_url}" ) # Extract and read the document cache_dir = "document_cache" if request.use_cache else None content, was_cached = extract_and_read_doc_from_zip_url(full_url, cache_dir) if not content: raise HTTPException( status_code=500, detail="Could not extract and read the document from the zip file" ) return DocumentResponse( spec=request.spec, url=full_url, content=content, cached=was_cached, content_length=len(content) ) except HTTPException: raise except Exception as e: raise HTTPException( status_code=500, detail=f"Error processing specification {request.spec}: {str(e)}" ) @app.delete("/cache") async def clear_cache(): """Clear all cached files""" try: cache_path = Path("document_cache") if cache_path.exists(): files_deleted = 0 for file in cache_path.glob("*"): if file.is_file(): file.unlink() files_deleted += 1 return {"message": f"Cache cleared successfully. {files_deleted} files deleted."} else: return {"message": "Cache directory does not exist."} except Exception as e: raise HTTPException( status_code=500, detail=f"Error clearing cache: {str(e)}" ) @app.get("/cache") async def list_cache(): """List all cached files""" try: cache_path = Path("document_cache") if cache_path.exists(): files = [] for file in cache_path.glob("*"): if file.is_file(): size = file.stat().st_size files.append({ "name": file.name, "size": size, "size_mb": round(size / (1024 * 1024), 2) }) return { "cache_directory": str(cache_path), "total_files": len(files), "files": files } else: return { "cache_directory": str(cache_path), "total_files": 0, "files": [], "message": "Cache directory does not exist" } except Exception as e: raise HTTPException( status_code=500, detail=f"Error listing cache: {str(e)}" )