Spaces:
Sleeping
Sleeping
File size: 13,070 Bytes
e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 847301a ec70242 e7d3262 ec70242 33f5fb8 ec70242 33f5fb8 ec70242 e7d3262 ec70242 e7d3262 ec70242 33f5fb8 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 e78ae06 ec70242 e78ae06 ec70242 e7d3262 ec70242 e7d3262 ec70242 e7d3262 ec70242 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import requests
from bs4 import BeautifulSoup
import zipfile
import io
import os
import subprocess
import hashlib
from pathlib import Path
from typing import Optional
import uvicorn
app = FastAPI(
title="3GPP Document Extractor API",
description="API to extract and read 3GPP specification documents from zip archives",
version="1.0.0"
)
# Pydantic models for request/response
class SpecRequest(BaseModel):
spec: str
use_cache: bool = True
class DocumentResponse(BaseModel):
spec: str
url: str
content: str
cached: bool
content_length: int
class LinkResponse(BaseModel):
spec: str
url: str
last_link: str
class ErrorResponse(BaseModel):
error: str
detail: str
def get_last_link_from_3gpp_spec(spec: str) -> Optional[str]:
"""
Fetches the last clickable link from a 3GPP specification page.
Args:
spec: The specification identifier (e.g., "38.211").
Returns:
The last clickable link URL, or None if not found.
"""
series = spec.split(".")[0]
doc_id = spec
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/"
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Find all anchor tags (links)
links = soup.find_all('a')
# Filter out links that are just directory traversals or empty
clickable_links = [link for link in links if link.get('href') and not link.get('href').startswith('../')]
if clickable_links:
# Return the href of the last clickable link
return clickable_links[-1].get('href')
else:
return None
except requests.exceptions.RequestException as e:
print(f"Error fetching the page: {e}")
return None
def extract_and_read_doc_from_zip_url(url: str, cache_dir: str = "document_cache") -> tuple[Optional[str], bool]:
"""
Downloads a zip file from a URL, extracts the first .docx or .doc file,
reads its content using LibreOffice via subprocess, and returns the text.
Uses caching to avoid re-processing the same files.
Args:
url: The URL of the zip file.
cache_dir: Directory to store cached files.
Returns:
Tuple of (text_content, was_cached) where was_cached indicates if result came from cache.
"""
try:
# Create cache directory if it doesn't exist
cache_path = Path(cache_dir)
cache_path.mkdir(exist_ok=True)
# Create a hash of the URL to use as cache key
url_hash = hashlib.md5(url.encode()).hexdigest()
# Check if cached text file exists
cached_txt_file = cache_path / f"{url_hash}.txt"
if cached_txt_file.exists():
print(f"Found cached version for URL: {url}")
with open(cached_txt_file, 'r', encoding='utf-8') as f:
return f.read(), True
print(f"No cache found, processing URL: {url}")
# Download the zip file
response = requests.get(url, stream=True)
response.raise_for_status()
# Use a BytesIO object to work with the zip data in memory
zip_data = io.BytesIO(response.content)
with zipfile.ZipFile(zip_data, 'r') as zip_ref:
for file_info in zip_ref.infolist():
filename = file_info.filename
if filename.lower().endswith(('.docx', '.doc')):
print(f"Found .docx or .doc file: {filename}")
# Create a unique filename for the cached document
file_extension = os.path.splitext(filename)[1]
cached_doc_file = cache_path / f"{url_hash}{file_extension}"
# Extract the file to cache directory
zip_ref.extract(filename, cache_path)
extracted_filepath = cache_path / filename
# Move to standardized cache filename
extracted_filepath.rename(cached_doc_file)
# Use subprocess to call LibreOffice for conversion
txt_filename = f"{url_hash}.txt"
txt_filepath = cache_path / txt_filename
try:
# Run LibreOffice conversion using subprocess
cmd = [
"libreoffice",
"--headless",
"--convert-to", "txt",
str(cached_doc_file),
"--outdir", str(cache_path)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60 # 60 second timeout
)
if result.returncode != 0:
print(f"LibreOffice conversion failed with return code {result.returncode}")
print(f"stderr: {result.stderr}")
return None, False
# The converted file will have the same base name as the original
original_base_name = os.path.splitext(os.path.basename(str(cached_doc_file)))[0]
converted_txt_file = cache_path / f"{original_base_name}.txt"
# Rename to our standardized cache filename if different
if converted_txt_file != txt_filepath:
if converted_txt_file.exists():
converted_txt_file.rename(txt_filepath)
# Read the converted text file
if txt_filepath.exists():
with open(txt_filepath, 'r', encoding='utf-8') as txt_file:
text_content = txt_file.read()
print(f"Successfully processed and cached document from: {url}")
return text_content, False
else:
print(f"Error: Converted text file not found at {txt_filepath}")
return None, False
except subprocess.TimeoutExpired:
print("LibreOffice conversion timed out after 60 seconds")
return None, False
except FileNotFoundError:
print("Error: LibreOffice not found. Please ensure LibreOffice is installed and in your PATH.")
return None, False
except Exception as e:
print(f"Error running LibreOffice conversion: {e}")
return None, False
print("No .docx or .doc file found in the zip archive.")
return None, False
except requests.exceptions.RequestException as e:
print(f"Error downloading or processing the zip file: {e}")
return None, False
except zipfile.BadZipFile:
print("Error: The downloaded file is not a valid zip file.")
return None, False
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None, False
# API Endpoints
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "3GPP Document Extractor API",
"version": "1.0.0",
"endpoints": {
"GET /": "API information",
"GET /spec/{spec}/link": "Get last link for a 3GPP specification",
"POST /extract": "Extract document content from 3GPP specification",
"GET /health": "Health check"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "message": "API is running"}
@app.get("/spec/{spec}/link", response_model=LinkResponse)
async def get_spec_link(spec: str):
"""
Get the last clickable link for a 3GPP specification.
Args:
spec: The specification identifier (e.g., "38.211")
Returns:
LinkResponse with the specification and its last link
"""
try:
last_link = get_last_link_from_3gpp_spec(spec)
if not last_link:
raise HTTPException(
status_code=404,
detail=f"No clickable links found for specification {spec}"
)
# Construct full URL
series = spec.split(".")[0]
base_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{spec}/"
full_url = base_url + last_link
return LinkResponse(
spec=spec,
url=base_url,
last_link=full_url
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing specification {spec}: {str(e)}"
)
@app.post("/extract", response_model=DocumentResponse)
async def extract_document(request: SpecRequest):
"""
Extract and read document content from a 3GPP specification.
Args:
request: SpecRequest containing spec identifier and cache preference
Returns:
DocumentResponse with the extracted content
"""
try:
# First, get the last link
last_link = get_last_link_from_3gpp_spec(request.spec)
if not last_link:
raise HTTPException(
status_code=404,
detail=f"No clickable links found for specification {request.spec}"
)
# Construct full URL
series = request.spec.split(".")[0]
base_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{request.spec}/"
full_url = base_url + last_link
# Check if it's a zip file
if not full_url.lower().endswith('.zip'):
raise HTTPException(
status_code=400,
detail=f"The last link is not a zip file: {full_url}"
)
# Extract and read the document
cache_dir = "document_cache" if request.use_cache else None
content, was_cached = extract_and_read_doc_from_zip_url(full_url, cache_dir)
if not content:
raise HTTPException(
status_code=500,
detail="Could not extract and read the document from the zip file"
)
return DocumentResponse(
spec=request.spec,
url=full_url,
content=content,
cached=was_cached,
content_length=len(content)
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing specification {request.spec}: {str(e)}"
)
@app.delete("/cache")
async def clear_cache():
"""Clear all cached files"""
try:
cache_path = Path("document_cache")
if cache_path.exists():
files_deleted = 0
for file in cache_path.glob("*"):
if file.is_file():
file.unlink()
files_deleted += 1
return {"message": f"Cache cleared successfully. {files_deleted} files deleted."}
else:
return {"message": "Cache directory does not exist."}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error clearing cache: {str(e)}"
)
@app.get("/cache")
async def list_cache():
"""List all cached files"""
try:
cache_path = Path("document_cache")
if cache_path.exists():
files = []
for file in cache_path.glob("*"):
if file.is_file():
size = file.stat().st_size
files.append({
"name": file.name,
"size": size,
"size_mb": round(size / (1024 * 1024), 2)
})
return {
"cache_directory": str(cache_path),
"total_files": len(files),
"files": files
}
else:
return {
"cache_directory": str(cache_path),
"total_files": 0,
"files": [],
"message": "Cache directory does not exist"
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error listing cache: {str(e)}"
) |