Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| import concurrent.futures | |
| import json as json_mod | |
| import os | |
| import time | |
| from typing import Annotated, List, Optional | |
| import httpx | |
| from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile | |
| from app.api.deps import get_converter_service, get_extraction_service, get_text_cleaner_service, require_auth | |
| from app.config import get_settings | |
| from app.core.logger import get_logger | |
| from app.models.domain import ConversionError | |
| from app.models.schemas import ( | |
| BatchFileResult, | |
| BatchResponse, | |
| BatchUrlRequest, | |
| ConversionMetadata, | |
| ) | |
| from app.services.converter_service import ConverterService | |
| from app.services.extraction_service import ExtractionService | |
| from app.services.text_cleaner_service import TextCleanerService | |
| router = APIRouter() | |
| _logger = get_logger(__name__) | |
| _settings = get_settings() | |
| _MAX_UPLOAD_BYTES = _settings.max_upload_bytes | |
| _MAX_BATCH_FILES = _settings.max_batch_files | |
| _MAX_WORKERS = min(32, (os.cpu_count() or 1) + 4) | |
| _thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=_MAX_WORKERS) | |
| def _build_metadata(result) -> ConversionMetadata: | |
| return ConversionMetadata( | |
| source=result.source, | |
| char_count=result.char_count, | |
| word_count=result.word_count, | |
| line_count=result.line_count, | |
| file_size_bytes=result.file_size_bytes, | |
| mime_type=result.mime_type, | |
| content_hash=result.content_hash, | |
| token_estimate=result.token_estimate, | |
| ) | |
| def _batch_result_from_error(name: str, err: ConversionError) -> BatchFileResult: | |
| return BatchFileResult( | |
| filename=name, | |
| success=False, | |
| time_ms=round(err.duration_ms, 3), | |
| error=err.message, | |
| ) | |
| def _batch_result_from_ok(result) -> BatchFileResult: | |
| return BatchFileResult( | |
| filename=result.source, | |
| success=True, | |
| time_ms=round(result.duration_ms, 3), | |
| content=result.markdown, | |
| metadata=_build_metadata(result), | |
| ) | |
| async def batch_files( | |
| files: Annotated[List[UploadFile], File(description="Files to convert")], | |
| return_json: bool = Form(False), | |
| clean_content: bool = Query(False), | |
| mappings: Optional[str] = Form(None, description="JSON string with field mappings"), | |
| token: str = Depends(require_auth), | |
| converter_service: ConverterService = Depends(get_converter_service), | |
| extraction_service: ExtractionService = Depends(get_extraction_service), | |
| text_cleaner_service: TextCleanerService = Depends(get_text_cleaner_service), | |
| ): | |
| if not files: | |
| raise HTTPException(status_code=400, detail={"success": False, "message": "No files provided."}) | |
| if len(files) > _MAX_BATCH_FILES: | |
| raise HTTPException(status_code=400, detail={"success": False, "message": f"Maximum {_MAX_BATCH_FILES} files per batch."}) | |
| parsed_mappings = None | |
| if mappings: | |
| try: | |
| parsed_mappings = json_mod.loads(mappings) | |
| except json_mod.JSONDecodeError: | |
| raise HTTPException(status_code=400, detail={"success": False, "message": "Invalid JSON in mappings parameter."}) | |
| batch_start = time.perf_counter() | |
| async def process_single_file(f: UploadFile) -> BatchFileResult: | |
| if f is None: | |
| return BatchFileResult(filename="unknown", success=False, time_ms=0, error="File object is None.") | |
| _logger.info("Batch processing file: %s", f.filename) | |
| raw = await f.read() | |
| if len(raw) > _MAX_UPLOAD_BYTES: | |
| return BatchFileResult( | |
| filename=f.filename or "unknown", | |
| success=False, | |
| time_ms=0, | |
| error=f"File exceeds {_settings.max_upload_mb} MB limit.", | |
| ) | |
| loop = asyncio.get_running_loop() | |
| outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_stream, raw, f.filename or "upload") | |
| if isinstance(outcome, ConversionError): | |
| return _batch_result_from_error(f.filename or "unknown", outcome) | |
| content = outcome.markdown | |
| if clean_content: | |
| loop2 = asyncio.get_running_loop() | |
| content = await loop2.run_in_executor(_thread_pool, text_cleaner_service.clean, content) | |
| result = _batch_result_from_ok(outcome) | |
| if clean_content: | |
| result.content = content | |
| if return_json and f.filename: | |
| loop = asyncio.get_running_loop() | |
| json_result = await loop.run_in_executor( | |
| _thread_pool, | |
| extraction_service.extract_structured, | |
| f.filename, | |
| outcome.markdown, | |
| parsed_mappings, | |
| raw, | |
| ) | |
| result.json_content = json_result if "error" not in json_result else None | |
| result.error = json_result.get("error") if "error" in json_result else None | |
| return result | |
| tasks = [process_single_file(f) for f in files] | |
| results = await asyncio.gather(*tasks) | |
| total_ms = round((time.perf_counter() - batch_start) * 1000, 3) | |
| succeeded = sum(1 for r in results if r.success) | |
| _logger.info("Batch files completed. Succeeded: %s/%s", succeeded, len(results)) | |
| return BatchResponse( | |
| total=len(results), | |
| succeeded=succeeded, | |
| failed=len(results) - succeeded, | |
| total_time_ms=total_ms, | |
| results=results, | |
| ) | |
| async def batch_urls( | |
| body: BatchUrlRequest, | |
| clean_content: bool = Query(False), | |
| token: str = Depends(require_auth), | |
| converter_service: ConverterService = Depends(get_converter_service), | |
| extraction_service: ExtractionService = Depends(get_extraction_service), | |
| text_cleaner_service: TextCleanerService = Depends(get_text_cleaner_service), | |
| ): | |
| batch_start = time.perf_counter() | |
| async def process_single_url(url: str) -> BatchFileResult: | |
| _logger.info("Batch processing URL: %s", url) | |
| from urllib.parse import urlparse | |
| parsed = urlparse(url) | |
| filename = parsed.path.split("/")[-1] or "url_content" | |
| if body.return_json: | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: | |
| resp = await client.get(url) | |
| resp.raise_for_status() | |
| raw_data = resp.content | |
| if len(raw_data) > _MAX_UPLOAD_BYTES: | |
| return BatchFileResult( | |
| filename=filename, success=False, time_ms=0, | |
| error=f"File exceeds {_settings.max_upload_mb} MB limit.", | |
| ) | |
| loop = asyncio.get_running_loop() | |
| outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_stream, raw_data, filename) | |
| if isinstance(outcome, ConversionError): | |
| return _batch_result_from_error(url, outcome) | |
| result = _batch_result_from_ok(outcome) | |
| if clean_content: | |
| loop2 = asyncio.get_running_loop() | |
| result.content = await loop2.run_in_executor(_thread_pool, text_cleaner_service.clean, outcome.markdown) | |
| loop = asyncio.get_running_loop() | |
| json_result = await loop.run_in_executor( | |
| _thread_pool, | |
| extraction_service.extract_structured, | |
| filename, | |
| outcome.markdown, | |
| body.mappings, | |
| raw_data, | |
| ) | |
| result.json_content = json_result if "error" not in json_result else None | |
| result.error = json_result.get("error") if "error" in json_result else None | |
| return result | |
| except httpx.HTTPError as exc: | |
| return BatchFileResult( | |
| filename=filename, success=False, time_ms=0, | |
| error=f"Failed to fetch URL: {exc}", | |
| ) | |
| loop = asyncio.get_running_loop() | |
| outcome = await loop.run_in_executor(_thread_pool, converter_service.convert_url, url) | |
| if isinstance(outcome, ConversionError): | |
| return _batch_result_from_error(url, outcome) | |
| result = _batch_result_from_ok(outcome) | |
| if clean_content: | |
| result.content = await loop.run_in_executor(_thread_pool, text_cleaner_service.clean, outcome.markdown) | |
| return result | |
| tasks = [process_single_url(url) for url in body.urls] | |
| results = await asyncio.gather(*tasks) | |
| total_ms = round((time.perf_counter() - batch_start) * 1000, 3) | |
| succeeded = sum(1 for r in results if r.success) | |
| _logger.info("Batch URLs completed. Succeeded: %s/%s", succeeded, len(results)) | |
| return BatchResponse( | |
| total=len(results), | |
| succeeded=succeeded, | |
| failed=len(results) - succeeded, | |
| total_time_ms=total_ms, | |
| results=results, | |
| ) | |