llm-ready-data / app /api /v1 /batch.py
light-infer-chat's picture
ok
7d1ad3f
Raw
History Blame Contribute Delete
9.13 kB
from __future__ import annotations
import asyncio
import concurrent.futures
import json as json_mod
import os
import time
from typing import Annotated, List, Optional
import httpx
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile
from app.api.deps import get_converter_service, get_extraction_service, get_text_cleaner_service, require_auth
from app.config import get_settings
from app.core.logger import get_logger
from app.models.domain import ConversionError
from app.models.schemas import (
BatchFileResult,
BatchResponse,
BatchUrlRequest,
ConversionMetadata,
)
from app.services.converter_service import ConverterService
from app.services.extraction_service import ExtractionService
from app.services.text_cleaner_service import TextCleanerService
router = APIRouter()
_logger = get_logger(__name__)
_settings = get_settings()
_MAX_UPLOAD_BYTES = _settings.max_upload_bytes
_MAX_BATCH_FILES = _settings.max_batch_files
_MAX_WORKERS = min(32, (os.cpu_count() or 1) + 4)
_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=_MAX_WORKERS)
def _build_metadata(result) -> ConversionMetadata:
return ConversionMetadata(
source=result.source,
char_count=result.char_count,
word_count=result.word_count,
line_count=result.line_count,
file_size_bytes=result.file_size_bytes,
mime_type=result.mime_type,
content_hash=result.content_hash,
token_estimate=result.token_estimate,
)
def _batch_result_from_error(name: str, err: ConversionError) -> BatchFileResult:
return BatchFileResult(
filename=name,
success=False,
time_ms=round(err.duration_ms, 3),
error=err.message,
)
def _batch_result_from_ok(result) -> BatchFileResult:
return BatchFileResult(
filename=result.source,
success=True,
time_ms=round(result.duration_ms, 3),
content=result.markdown,
metadata=_build_metadata(result),
)
@router.post(
"/batch/files",
response_model=BatchResponse,
summary="Convert multiple files (up to 10)",
)
async def batch_files(
files: Annotated[List[UploadFile], File(description="Files to convert")],
return_json: bool = Form(False),
clean_content: bool = Query(False),
mappings: Optional[str] = Form(None, description="JSON string with field mappings"),
token: str = Depends(require_auth),
converter_service: ConverterService = Depends(get_converter_service),
extraction_service: ExtractionService = Depends(get_extraction_service),
text_cleaner_service: TextCleanerService = Depends(get_text_cleaner_service),
):
if not files:
raise HTTPException(status_code=400, detail={"success": False, "message": "No files provided."})
if len(files) > _MAX_BATCH_FILES:
raise HTTPException(status_code=400, detail={"success": False, "message": f"Maximum {_MAX_BATCH_FILES} files per batch."})
parsed_mappings = None
if mappings:
try:
parsed_mappings = json_mod.loads(mappings)
except json_mod.JSONDecodeError:
raise HTTPException(status_code=400, detail={"success": False, "message": "Invalid JSON in mappings parameter."})
batch_start = time.perf_counter()
async def process_single_file(f: UploadFile) -> BatchFileResult:
if f is None:
return BatchFileResult(filename="unknown", success=False, time_ms=0, error="File object is None.")
_logger.info("Batch processing file: %s", f.filename)
raw = await f.read()
if len(raw) > _MAX_UPLOAD_BYTES:
return BatchFileResult(
filename=f.filename or "unknown",
success=False,
time_ms=0,
error=f"File exceeds {_settings.max_upload_mb} MB limit.",
)
loop = asyncio.get_running_loop()
outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_stream, raw, f.filename or "upload")
if isinstance(outcome, ConversionError):
return _batch_result_from_error(f.filename or "unknown", outcome)
content = outcome.markdown
if clean_content:
loop2 = asyncio.get_running_loop()
content = await loop2.run_in_executor(_thread_pool, text_cleaner_service.clean, content)
result = _batch_result_from_ok(outcome)
if clean_content:
result.content = content
if return_json and f.filename:
loop = asyncio.get_running_loop()
json_result = await loop.run_in_executor(
_thread_pool,
extraction_service.extract_structured,
f.filename,
outcome.markdown,
parsed_mappings,
raw,
)
result.json_content = json_result if "error" not in json_result else None
result.error = json_result.get("error") if "error" in json_result else None
return result
tasks = [process_single_file(f) for f in files]
results = await asyncio.gather(*tasks)
total_ms = round((time.perf_counter() - batch_start) * 1000, 3)
succeeded = sum(1 for r in results if r.success)
_logger.info("Batch files completed. Succeeded: %s/%s", succeeded, len(results))
return BatchResponse(
total=len(results),
succeeded=succeeded,
failed=len(results) - succeeded,
total_time_ms=total_ms,
results=results,
)
@router.post(
"/batch/urls",
response_model=BatchResponse,
summary="Convert multiple URLs (up to 20)",
)
async def batch_urls(
body: BatchUrlRequest,
clean_content: bool = Query(False),
token: str = Depends(require_auth),
converter_service: ConverterService = Depends(get_converter_service),
extraction_service: ExtractionService = Depends(get_extraction_service),
text_cleaner_service: TextCleanerService = Depends(get_text_cleaner_service),
):
batch_start = time.perf_counter()
async def process_single_url(url: str) -> BatchFileResult:
_logger.info("Batch processing URL: %s", url)
from urllib.parse import urlparse
parsed = urlparse(url)
filename = parsed.path.split("/")[-1] or "url_content"
if body.return_json:
try:
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
resp = await client.get(url)
resp.raise_for_status()
raw_data = resp.content
if len(raw_data) > _MAX_UPLOAD_BYTES:
return BatchFileResult(
filename=filename, success=False, time_ms=0,
error=f"File exceeds {_settings.max_upload_mb} MB limit.",
)
loop = asyncio.get_running_loop()
outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_stream, raw_data, filename)
if isinstance(outcome, ConversionError):
return _batch_result_from_error(url, outcome)
result = _batch_result_from_ok(outcome)
if clean_content:
loop2 = asyncio.get_running_loop()
result.content = await loop2.run_in_executor(_thread_pool, text_cleaner_service.clean, outcome.markdown)
loop = asyncio.get_running_loop()
json_result = await loop.run_in_executor(
_thread_pool,
extraction_service.extract_structured,
filename,
outcome.markdown,
body.mappings,
raw_data,
)
result.json_content = json_result if "error" not in json_result else None
result.error = json_result.get("error") if "error" in json_result else None
return result
except httpx.HTTPError as exc:
return BatchFileResult(
filename=filename, success=False, time_ms=0,
error=f"Failed to fetch URL: {exc}",
)
loop = asyncio.get_running_loop()
outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_url, url)
if isinstance(outcome, ConversionError):
return _batch_result_from_error(url, outcome)
result = _batch_result_from_ok(outcome)
if clean_content:
result.content = await loop.run_in_executor(_thread_pool, text_cleaner_service.clean, outcome.markdown)
return result
tasks = [process_single_url(url) for url in body.urls]
results = await asyncio.gather(*tasks)
total_ms = round((time.perf_counter() - batch_start) * 1000, 3)
succeeded = sum(1 for r in results if r.success)
_logger.info("Batch URLs completed. Succeeded: %s/%s", succeeded, len(results))
return BatchResponse(
total=len(results),
succeeded=succeeded,
failed=len(results) - succeeded,
total_time_ms=total_ms,
results=results,
)