from __future__ import annotations import asyncio import concurrent.futures import json as json_mod import os import time from typing import Annotated, List, Optional import httpx from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile from app.api.deps import get_converter_service, get_extraction_service, get_text_cleaner_service, require_auth from app.config import get_settings from app.core.logger import get_logger from app.models.domain import ConversionError from app.models.schemas import ( BatchFileResult, BatchResponse, BatchUrlRequest, ConversionMetadata, ) from app.services.converter_service import ConverterService from app.services.extraction_service import ExtractionService from app.services.text_cleaner_service import TextCleanerService router = APIRouter() _logger = get_logger(__name__) _settings = get_settings() _MAX_UPLOAD_BYTES = _settings.max_upload_bytes _MAX_BATCH_FILES = _settings.max_batch_files _MAX_WORKERS = min(32, (os.cpu_count() or 1) + 4) _thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=_MAX_WORKERS) def _build_metadata(result) -> ConversionMetadata: return ConversionMetadata( source=result.source, char_count=result.char_count, word_count=result.word_count, line_count=result.line_count, file_size_bytes=result.file_size_bytes, mime_type=result.mime_type, content_hash=result.content_hash, token_estimate=result.token_estimate, ) def _batch_result_from_error(name: str, err: ConversionError) -> BatchFileResult: return BatchFileResult( filename=name, success=False, time_ms=round(err.duration_ms, 3), error=err.message, ) def _batch_result_from_ok(result) -> BatchFileResult: return BatchFileResult( filename=result.source, success=True, time_ms=round(result.duration_ms, 3), content=result.markdown, metadata=_build_metadata(result), ) @router.post( "/batch/files", response_model=BatchResponse, summary="Convert multiple files (up to 10)", ) async def batch_files( files: Annotated[List[UploadFile], File(description="Files to convert")], return_json: bool = Form(False), clean_content: bool = Query(False), mappings: Optional[str] = Form(None, description="JSON string with field mappings"), token: str = Depends(require_auth), converter_service: ConverterService = Depends(get_converter_service), extraction_service: ExtractionService = Depends(get_extraction_service), text_cleaner_service: TextCleanerService = Depends(get_text_cleaner_service), ): if not files: raise HTTPException(status_code=400, detail={"success": False, "message": "No files provided."}) if len(files) > _MAX_BATCH_FILES: raise HTTPException(status_code=400, detail={"success": False, "message": f"Maximum {_MAX_BATCH_FILES} files per batch."}) parsed_mappings = None if mappings: try: parsed_mappings = json_mod.loads(mappings) except json_mod.JSONDecodeError: raise HTTPException(status_code=400, detail={"success": False, "message": "Invalid JSON in mappings parameter."}) batch_start = time.perf_counter() async def process_single_file(f: UploadFile) -> BatchFileResult: if f is None: return BatchFileResult(filename="unknown", success=False, time_ms=0, error="File object is None.") _logger.info("Batch processing file: %s", f.filename) raw = await f.read() if len(raw) > _MAX_UPLOAD_BYTES: return BatchFileResult( filename=f.filename or "unknown", success=False, time_ms=0, error=f"File exceeds {_settings.max_upload_mb} MB limit.", ) loop = asyncio.get_running_loop() outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_stream, raw, f.filename or "upload") if isinstance(outcome, ConversionError): return _batch_result_from_error(f.filename or "unknown", outcome) content = outcome.markdown if clean_content: loop2 = asyncio.get_running_loop() content = await loop2.run_in_executor(_thread_pool, text_cleaner_service.clean, content) result = _batch_result_from_ok(outcome) if clean_content: result.content = content if return_json and f.filename: loop = asyncio.get_running_loop() json_result = await loop.run_in_executor( _thread_pool, extraction_service.extract_structured, f.filename, outcome.markdown, parsed_mappings, raw, ) result.json_content = json_result if "error" not in json_result else None result.error = json_result.get("error") if "error" in json_result else None return result tasks = [process_single_file(f) for f in files] results = await asyncio.gather(*tasks) total_ms = round((time.perf_counter() - batch_start) * 1000, 3) succeeded = sum(1 for r in results if r.success) _logger.info("Batch files completed. Succeeded: %s/%s", succeeded, len(results)) return BatchResponse( total=len(results), succeeded=succeeded, failed=len(results) - succeeded, total_time_ms=total_ms, results=results, ) @router.post( "/batch/urls", response_model=BatchResponse, summary="Convert multiple URLs (up to 20)", ) async def batch_urls( body: BatchUrlRequest, clean_content: bool = Query(False), token: str = Depends(require_auth), converter_service: ConverterService = Depends(get_converter_service), extraction_service: ExtractionService = Depends(get_extraction_service), text_cleaner_service: TextCleanerService = Depends(get_text_cleaner_service), ): batch_start = time.perf_counter() async def process_single_url(url: str) -> BatchFileResult: _logger.info("Batch processing URL: %s", url) from urllib.parse import urlparse parsed = urlparse(url) filename = parsed.path.split("/")[-1] or "url_content" if body.return_json: try: async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: resp = await client.get(url) resp.raise_for_status() raw_data = resp.content if len(raw_data) > _MAX_UPLOAD_BYTES: return BatchFileResult( filename=filename, success=False, time_ms=0, error=f"File exceeds {_settings.max_upload_mb} MB limit.", ) loop = asyncio.get_running_loop() outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_stream, raw_data, filename) if isinstance(outcome, ConversionError): return _batch_result_from_error(url, outcome) result = _batch_result_from_ok(outcome) if clean_content: loop2 = asyncio.get_running_loop() result.content = await loop2.run_in_executor(_thread_pool, text_cleaner_service.clean, outcome.markdown) loop = asyncio.get_running_loop() json_result = await loop.run_in_executor( _thread_pool, extraction_service.extract_structured, filename, outcome.markdown, body.mappings, raw_data, ) result.json_content = json_result if "error" not in json_result else None result.error = json_result.get("error") if "error" in json_result else None return result except httpx.HTTPError as exc: return BatchFileResult( filename=filename, success=False, time_ms=0, error=f"Failed to fetch URL: {exc}", ) loop = asyncio.get_running_loop() outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_url, url) if isinstance(outcome, ConversionError): return _batch_result_from_error(url, outcome) result = _batch_result_from_ok(outcome) if clean_content: result.content = await loop.run_in_executor(_thread_pool, text_cleaner_service.clean, outcome.markdown) return result tasks = [process_single_url(url) for url in body.urls] results = await asyncio.gather(*tasks) total_ms = round((time.perf_counter() - batch_start) * 1000, 3) succeeded = sum(1 for r in results if r.success) _logger.info("Batch URLs completed. Succeeded: %s/%s", succeeded, len(results)) return BatchResponse( total=len(results), succeeded=succeeded, failed=len(results) - succeeded, total_time_ms=total_ms, results=results, )