llm-ready-data / app /api /v1 /convert.py
light-infer-chat's picture
ok
7d1ad3f
Raw
History Blame Contribute Delete
9.69 kB
from __future__ import annotations
import asyncio
import concurrent.futures
import json as json_mod
import os
from typing import Annotated, Any, Dict, Optional
from urllib.parse import urlparse
import httpx
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status
from pydantic import BaseModel
from app.config import get_settings
from app.api.deps import (
get_converter_service,
get_extraction_service,
get_text_cleaner_service,
require_auth,
)
from app.core.logger import get_logger
from app.models.domain import ConversionError, count_tokens
from app.models.schemas import ConversionMetadata, ConversionResponse, UrlRequest
from app.services.converter_service import ConverterService
from app.services.extraction_service import ExtractionService
from app.services.text_cleaner_service import TextCleanerService
router = APIRouter()
_logger = get_logger(__name__)
_settings = get_settings()
_MAX_UPLOAD_BYTES = _settings.max_upload_bytes
_MAX_WORKERS = min(32, (os.cpu_count() or 1) + 4)
_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=_MAX_WORKERS)
def _build_metadata(result) -> ConversionMetadata:
return ConversionMetadata(
source=result.source,
char_count=result.char_count,
word_count=result.word_count,
line_count=result.line_count,
file_size_bytes=result.file_size_bytes,
mime_type=result.mime_type,
content_hash=result.content_hash,
token_estimate=result.token_estimate,
)
async def _build_response(
result,
*,
return_json: bool = False,
filename: Optional[str] = None,
raw_data: Optional[bytes] = None,
mappings: Optional[Dict[str, Dict[str, Any]]] = None,
extraction_service: ExtractionService = None,
clean_content: bool = False,
text_cleaner_service: TextCleanerService = None,
) -> ConversionResponse:
json_content: Optional[Any] = None
error_message: Optional[str] = None
content = result.markdown
if clean_content and text_cleaner_service:
loop = asyncio.get_running_loop()
content = await loop.run_in_executor(_thread_pool, text_cleaner_service.clean, content)
if return_json and filename and extraction_service:
loop = asyncio.get_running_loop()
json_result = await loop.run_in_executor(
_thread_pool,
extraction_service.extract_structured,
filename,
content,
mappings,
raw_data,
)
if "error" in json_result:
error_message = json_result["error"]
else:
json_content = json_result
if clean_content and content != result.markdown:
lines = content.splitlines()
words = content.split()
import hashlib
metadata = ConversionMetadata(
source=result.source,
char_count=len(content),
word_count=len(words),
line_count=len(lines),
file_size_bytes=result.file_size_bytes,
mime_type=result.mime_type,
content_hash=hashlib.sha256(content.encode()).hexdigest(),
token_estimate=max(1, count_tokens(content)),
)
else:
metadata = _build_metadata(result)
return ConversionResponse(
success=True,
time_ms=round(result.duration_ms, 3),
content=content,
return_json=return_json,
json_content=json_content,
metadata=metadata,
error_message=error_message,
)
def _raise_for_error(outcome: ConversionError) -> None:
status_map = {
"FileNotFoundError": status.HTTP_404_NOT_FOUND,
"ValueError": status.HTTP_422_UNPROCESSABLE_ENTITY,
"PermissionError": status.HTTP_403_FORBIDDEN,
}
code = status_map.get(outcome.error_type, status.HTTP_500_INTERNAL_SERVER_ERROR)
raise HTTPException(
status_code=code,
detail={
"success": False,
"error_type": outcome.error_type,
"message": outcome.message,
"time_ms": round(outcome.duration_ms, 3),
},
)
@router.post(
"/convert/file",
response_model=ConversionResponse,
summary="Convert uploaded file to Markdown",
)
async def convert_file(
file: Annotated[UploadFile, File(description="File to convert")],
plain_text: bool = Form(False),
return_json: bool = Form(False),
clean_content: bool = Query(False),
mappings: Optional[str] = Form(None, description="JSON string with field mappings"),
token: str = Depends(require_auth),
converter_service: ConverterService = Depends(get_converter_service),
extraction_service: ExtractionService = Depends(get_extraction_service),
text_cleaner_service: TextCleanerService = Depends(get_text_cleaner_service),
):
if file is None:
raise HTTPException(status_code=400, detail={"success": False, "message": "No file provided."})
parsed_mappings = None
if mappings:
try:
parsed_mappings = json_mod.loads(mappings)
except json_mod.JSONDecodeError:
raise HTTPException(status_code=400, detail={"success": False, "message": "Invalid JSON in mappings parameter."})
_logger.info("Received request to convert file: %s", file.filename)
raw = await file.read()
if len(raw) > _MAX_UPLOAD_BYTES:
_logger.error("File %s exceeds %s MB limit", file.filename, _settings.max_upload_mb)
raise HTTPException(status_code=413, detail={"success": False, "message": f"File exceeds {_settings.max_upload_mb} MB limit."})
loop = asyncio.get_running_loop()
outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_stream, raw, file.filename or "upload")
if isinstance(outcome, ConversionError):
_logger.error("Conversion failed for %s: %s", file.filename, outcome.message)
_raise_for_error(outcome)
_logger.info("Conversion successful for %s", file.filename)
content = outcome.markdown
if clean_content:
content = await loop.run_in_executor(_thread_pool, text_cleaner_service.clean, content)
if plain_text:
from fastapi.responses import PlainTextResponse
return PlainTextResponse(content)
response = await _build_response(
outcome,
return_json=return_json,
filename=file.filename,
raw_data=raw,
mappings=parsed_mappings,
extraction_service=extraction_service,
clean_content=clean_content,
text_cleaner_service=text_cleaner_service,
)
_logger.info("Request completed for %s", file.filename)
return response
@router.post(
"/convert/url",
response_model=ConversionResponse,
summary="Convert a URL to Markdown",
)
async def convert_url(
body: UrlRequest,
clean_content: bool = Query(False),
token: str = Depends(require_auth),
converter_service: ConverterService = Depends(get_converter_service),
extraction_service: ExtractionService = Depends(get_extraction_service),
text_cleaner_service: TextCleanerService = Depends(get_text_cleaner_service),
):
_logger.info("Received request to convert URL: %s", body.url)
parsed = urlparse(body.url)
filename = parsed.path.split("/")[-1] or "url_content"
if body.return_json:
try:
_logger.info("Fetching URL for JSON extraction: %s", body.url)
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
resp = await client.get(body.url)
resp.raise_for_status()
raw_data = resp.content
if len(raw_data) > _MAX_UPLOAD_BYTES:
raise HTTPException(status_code=413, detail={"success": False, "message": f"File exceeds {_settings.max_upload_mb} MB limit."})
loop = asyncio.get_running_loop()
outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_stream, raw_data, filename)
if isinstance(outcome, ConversionError):
_logger.error("Conversion failed for URL %s: %s", body.url, outcome.message)
_raise_for_error(outcome)
_logger.info("Conversion successful for URL %s", body.url)
response = await _build_response(
outcome,
return_json=body.return_json,
filename=filename,
raw_data=raw_data,
mappings=body.mappings,
extraction_service=extraction_service,
clean_content=clean_content,
text_cleaner_service=text_cleaner_service,
)
_logger.info("Request completed for URL %s", body.url)
return response
except httpx.HTTPError as exc:
_logger.error("Failed to fetch URL %s: %s", body.url, exc)
raise HTTPException(status_code=400, detail={"success": False, "message": f"Failed to fetch URL: {exc}"})
loop = asyncio.get_running_loop()
outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_url, body.url)
if isinstance(outcome, ConversionError):
_logger.error("Conversion failed for URL %s: %s", body.url, outcome.message)
_raise_for_error(outcome)
_logger.info("Conversion successful for URL %s", body.url)
response = await _build_response(
outcome,
return_json=body.return_json,
filename=filename,
mappings=body.mappings,
extraction_service=extraction_service,
clean_content=clean_content,
text_cleaner_service=text_cleaner_service,
)
_logger.info("Request completed for URL %s", body.url)
return response