from typing import Dict, Any, Optional, List, Union, BinaryIO from pathlib import Path import aiofiles import json import logging from ...core.document.processor import ProcessedDocument from ..client import LatticeClient class DocumentClient: """Document processing client for SDK""" def __init__(self, client: LatticeClient): self.client = client self.logger = logging.getLogger("lattice.sdk.document") async def process_document( self, file: Union[str, Path, BinaryIO], config: Optional[Dict[str, Any]] = None ) -> ProcessedDocument: """Process a document""" try: # Prepare file if isinstance(file, (str, Path)): file_path = Path(file) async with aiofiles.open(file_path, 'rb') as f: file_content = await f.read() filename = file_path.name else: file_content = file.read() filename = getattr(file, 'name', 'document') # Prepare form data form = aiofiles.tempfile.SpooledTemporaryFile() form.write(file_content) form.seek(0) files = { 'file': (filename, form, 'application/octet-stream') } # Add configuration if provided data = {} if config: data['config'] = json.dumps(config) # Make request response = await self.client.post( "/api/v1/document/process", data=data, files=files ) return ProcessedDocument(**response['document']) except Exception as e: self.logger.error(f"Document processing failed: {str(e)}") raise finally: if 'form' in locals(): form.close() async def batch_process( self, files: List[Union[str, Path, BinaryIO]] ) -> Dict[str, ProcessedDocument]: """Batch process documents""" try: upload_files = [] temp_files = [] # Prepare files for file in files: if isinstance(file, (str, Path)): file_path = Path(file) async with aiofiles.open(file_path, 'rb') as f: file_content = await f.read() filename = file_path.name else: file_content = file.read() filename = getattr(file, 'name', f'document_{len(upload_files)}') # Create temporary file temp_file = aiofiles.tempfile.SpooledTemporaryFile() temp_file.write(file_content) temp_file.seek(0) temp_files.append(temp_file) upload_files.append( ('files', (filename, temp_file, 'application/octet-stream')) ) # Make request response = await self.client.post( "/api/v1/document/batch", files=upload_files ) # Process response return { filename: ProcessedDocument(**doc['document']) for filename, doc in response.items() } except Exception as e: self.logger.error(f"Batch processing failed: {str(e)}") raise finally: # Clean up temporary files for temp_file in temp_files: temp_file.close() async def get_supported_types(self) -> Dict[str, List[str]]: """Get supported document types""" try: response = await self.client.get("/api/v1/document/supported-types") return response['supported_types'] except Exception as e: self.logger.error(f"Failed to get supported types: {str(e)}") raise async def validate_config(self, config: Dict[str, Any]) -> bool: """Validate document processing configuration""" try: response = await self.client.get( "/api/v1/document/config/validate", params={"config": json.dumps(config)} ) return response['valid'] except Exception as e: self.logger.error(f"Config validation failed: {str(e)}") raise async def health_check(self) -> Dict[str, Any]: """Check document processor health""" try: return await self.client.get("/api/v1/document/health") except Exception as e: self.logger.error(f"Health check failed: {str(e)}") raise # Usage example: async def example_usage(): # Initialize client client = LatticeClient(api_key="your-api-key") # Configure document processing config = { "extract_text": True, "extract_metadata": True, "chunk_size": 500, "chunk_overlap": 50 } # Process single document doc_result = await client.document.process_document( "example.pdf", config=config ) print(f"Processed document: {doc_result.doc_id}") print(f"Number of chunks: {len(doc_result.chunks)}") # Batch process documents files = ["doc1.pdf", "doc2.docx", "doc3.txt"] batch_results = await client.document.batch_process(files) for filename, result in batch_results.items(): print(f"{filename}: {len(result.chunks)} chunks") # Check supported types supported_types = await client.document.get_supported_types() print(f"Supported types: {supported_types}") if __name__ == "__main__": import asyncio asyncio.run(example_usage())