Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import zipfile | |
| import io | |
| import os | |
| import subprocess | |
| import hashlib | |
| from pathlib import Path | |
| from typing import Optional | |
| import uvicorn | |
| app = FastAPI( | |
| title="3GPP Document Extractor API", | |
| description="API to extract and read 3GPP specification documents from zip archives", | |
| version="1.0.0" | |
| ) | |
| # Pydantic models for request/response | |
| class SpecRequest(BaseModel): | |
| spec: str | |
| use_cache: bool = True | |
| class DocumentResponse(BaseModel): | |
| spec: str | |
| url: str | |
| content: str | |
| cached: bool | |
| content_length: int | |
| class LinkResponse(BaseModel): | |
| spec: str | |
| url: str | |
| last_link: str | |
| class ErrorResponse(BaseModel): | |
| error: str | |
| detail: str | |
| def get_last_link_from_3gpp_spec(spec: str) -> Optional[str]: | |
| """ | |
| Fetches the last clickable link from a 3GPP specification page. | |
| Args: | |
| spec: The specification identifier (e.g., "38.211"). | |
| Returns: | |
| The last clickable link URL, or None if not found. | |
| """ | |
| series = spec.split(".")[0] | |
| doc_id = spec | |
| url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/" | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Find all anchor tags (links) | |
| links = soup.find_all('a') | |
| # Filter out links that are just directory traversals or empty | |
| clickable_links = [link for link in links if link.get('href') and not link.get('href').startswith('../')] | |
| if clickable_links: | |
| # Return the href of the last clickable link | |
| return clickable_links[-1].get('href') | |
| else: | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching the page: {e}") | |
| return None | |
| def extract_and_read_doc_from_zip_url(url: str, cache_dir: str = "document_cache") -> tuple[Optional[str], bool]: | |
| """ | |
| Downloads a zip file from a URL, extracts the first .docx or .doc file, | |
| reads its content using LibreOffice via subprocess, and returns the text. | |
| Uses caching to avoid re-processing the same files. | |
| Args: | |
| url: The URL of the zip file. | |
| cache_dir: Directory to store cached files. | |
| Returns: | |
| Tuple of (text_content, was_cached) where was_cached indicates if result came from cache. | |
| """ | |
| try: | |
| # Create cache directory if it doesn't exist | |
| cache_path = Path(cache_dir) | |
| cache_path.mkdir(exist_ok=True) | |
| # Create a hash of the URL to use as cache key | |
| url_hash = hashlib.md5(url.encode()).hexdigest() | |
| # Check if cached text file exists | |
| cached_txt_file = cache_path / f"{url_hash}.txt" | |
| if cached_txt_file.exists(): | |
| print(f"Found cached version for URL: {url}") | |
| with open(cached_txt_file, 'r', encoding='utf-8') as f: | |
| return f.read(), True | |
| print(f"No cache found, processing URL: {url}") | |
| # Download the zip file | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| # Use a BytesIO object to work with the zip data in memory | |
| zip_data = io.BytesIO(response.content) | |
| with zipfile.ZipFile(zip_data, 'r') as zip_ref: | |
| for file_info in zip_ref.infolist(): | |
| filename = file_info.filename | |
| if filename.lower().endswith(('.docx', '.doc')): | |
| print(f"Found .docx or .doc file: {filename}") | |
| # Create a unique filename for the cached document | |
| file_extension = os.path.splitext(filename)[1] | |
| cached_doc_file = cache_path / f"{url_hash}{file_extension}" | |
| # Extract the file to cache directory | |
| zip_ref.extract(filename, cache_path) | |
| extracted_filepath = cache_path / filename | |
| # Move to standardized cache filename | |
| extracted_filepath.rename(cached_doc_file) | |
| # Use subprocess to call LibreOffice for conversion | |
| txt_filename = f"{url_hash}.txt" | |
| txt_filepath = cache_path / txt_filename | |
| try: | |
| # Run LibreOffice conversion using subprocess | |
| cmd = [ | |
| "libreoffice", | |
| "--headless", | |
| "--convert-to", "txt", | |
| str(cached_doc_file), | |
| "--outdir", str(cache_path) | |
| ] | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=60 # 60 second timeout | |
| ) | |
| if result.returncode != 0: | |
| print(f"LibreOffice conversion failed with return code {result.returncode}") | |
| print(f"stderr: {result.stderr}") | |
| return None, False | |
| # The converted file will have the same base name as the original | |
| original_base_name = os.path.splitext(os.path.basename(str(cached_doc_file)))[0] | |
| converted_txt_file = cache_path / f"{original_base_name}.txt" | |
| # Rename to our standardized cache filename if different | |
| if converted_txt_file != txt_filepath: | |
| if converted_txt_file.exists(): | |
| converted_txt_file.rename(txt_filepath) | |
| # Read the converted text file | |
| if txt_filepath.exists(): | |
| with open(txt_filepath, 'r', encoding='utf-8') as txt_file: | |
| text_content = txt_file.read() | |
| print(f"Successfully processed and cached document from: {url}") | |
| return text_content, False | |
| else: | |
| print(f"Error: Converted text file not found at {txt_filepath}") | |
| return None, False | |
| except subprocess.TimeoutExpired: | |
| print("LibreOffice conversion timed out after 60 seconds") | |
| return None, False | |
| except FileNotFoundError: | |
| print("Error: LibreOffice not found. Please ensure LibreOffice is installed and in your PATH.") | |
| return None, False | |
| except Exception as e: | |
| print(f"Error running LibreOffice conversion: {e}") | |
| return None, False | |
| print("No .docx or .doc file found in the zip archive.") | |
| return None, False | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error downloading or processing the zip file: {e}") | |
| return None, False | |
| except zipfile.BadZipFile: | |
| print("Error: The downloaded file is not a valid zip file.") | |
| return None, False | |
| except Exception as e: | |
| print(f"An unexpected error occurred: {e}") | |
| return None, False | |
| # API Endpoints | |
| async def root(): | |
| """Root endpoint with API information""" | |
| return { | |
| "message": "3GPP Document Extractor API", | |
| "version": "1.0.0", | |
| "endpoints": { | |
| "GET /": "API information", | |
| "GET /spec/{spec}/link": "Get last link for a 3GPP specification", | |
| "POST /extract": "Extract document content from 3GPP specification", | |
| "GET /health": "Health check" | |
| } | |
| } | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "healthy", "message": "API is running"} | |
| async def get_spec_link(spec: str): | |
| """ | |
| Get the last clickable link for a 3GPP specification. | |
| Args: | |
| spec: The specification identifier (e.g., "38.211") | |
| Returns: | |
| LinkResponse with the specification and its last link | |
| """ | |
| try: | |
| last_link = get_last_link_from_3gpp_spec(spec) | |
| if not last_link: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"No clickable links found for specification {spec}" | |
| ) | |
| # Construct full URL | |
| series = spec.split(".")[0] | |
| base_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{spec}/" | |
| full_url = base_url + last_link | |
| return LinkResponse( | |
| spec=spec, | |
| url=base_url, | |
| last_link=full_url | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error processing specification {spec}: {str(e)}" | |
| ) | |
| async def extract_document(request: SpecRequest): | |
| """ | |
| Extract and read document content from a 3GPP specification. | |
| Args: | |
| request: SpecRequest containing spec identifier and cache preference | |
| Returns: | |
| DocumentResponse with the extracted content | |
| """ | |
| try: | |
| # First, get the last link | |
| last_link = get_last_link_from_3gpp_spec(request.spec) | |
| if not last_link: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"No clickable links found for specification {request.spec}" | |
| ) | |
| # Construct full URL | |
| series = request.spec.split(".")[0] | |
| base_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{request.spec}/" | |
| full_url = base_url + last_link | |
| # Check if it's a zip file | |
| if not full_url.lower().endswith('.zip'): | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"The last link is not a zip file: {full_url}" | |
| ) | |
| # Extract and read the document | |
| cache_dir = "document_cache" if request.use_cache else None | |
| content, was_cached = extract_and_read_doc_from_zip_url(full_url, cache_dir) | |
| if not content: | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Could not extract and read the document from the zip file" | |
| ) | |
| return DocumentResponse( | |
| spec=request.spec, | |
| url=full_url, | |
| content=content, | |
| cached=was_cached, | |
| content_length=len(content) | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error processing specification {request.spec}: {str(e)}" | |
| ) | |
| async def clear_cache(): | |
| """Clear all cached files""" | |
| try: | |
| cache_path = Path("document_cache") | |
| if cache_path.exists(): | |
| files_deleted = 0 | |
| for file in cache_path.glob("*"): | |
| if file.is_file(): | |
| file.unlink() | |
| files_deleted += 1 | |
| return {"message": f"Cache cleared successfully. {files_deleted} files deleted."} | |
| else: | |
| return {"message": "Cache directory does not exist."} | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error clearing cache: {str(e)}" | |
| ) | |
| async def list_cache(): | |
| """List all cached files""" | |
| try: | |
| cache_path = Path("document_cache") | |
| if cache_path.exists(): | |
| files = [] | |
| for file in cache_path.glob("*"): | |
| if file.is_file(): | |
| size = file.stat().st_size | |
| files.append({ | |
| "name": file.name, | |
| "size": size, | |
| "size_mb": round(size / (1024 * 1024), 2) | |
| }) | |
| return { | |
| "cache_directory": str(cache_path), | |
| "total_files": len(files), | |
| "files": files | |
| } | |
| else: | |
| return { | |
| "cache_directory": str(cache_path), | |
| "total_files": 0, | |
| "files": [], | |
| "message": "Cache directory does not exist" | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error listing cache: {str(e)}" | |
| ) |