|
|
from typing import Dict, Any, Optional, List, Union, BinaryIO |
|
|
from pathlib import Path |
|
|
import aiofiles |
|
|
import json |
|
|
import logging |
|
|
from ...core.document.processor import ProcessedDocument |
|
|
from ..client import LatticeClient |
|
|
|
|
|
class DocumentClient: |
|
|
"""Document processing client for SDK""" |
|
|
|
|
|
def __init__(self, client: LatticeClient): |
|
|
self.client = client |
|
|
self.logger = logging.getLogger("lattice.sdk.document") |
|
|
|
|
|
async def process_document( |
|
|
self, |
|
|
file: Union[str, Path, BinaryIO], |
|
|
config: Optional[Dict[str, Any]] = None |
|
|
) -> ProcessedDocument: |
|
|
"""Process a document""" |
|
|
try: |
|
|
|
|
|
if isinstance(file, (str, Path)): |
|
|
file_path = Path(file) |
|
|
async with aiofiles.open(file_path, 'rb') as f: |
|
|
file_content = await f.read() |
|
|
filename = file_path.name |
|
|
else: |
|
|
file_content = file.read() |
|
|
filename = getattr(file, 'name', 'document') |
|
|
|
|
|
|
|
|
form = aiofiles.tempfile.SpooledTemporaryFile() |
|
|
form.write(file_content) |
|
|
form.seek(0) |
|
|
|
|
|
files = { |
|
|
'file': (filename, form, 'application/octet-stream') |
|
|
} |
|
|
|
|
|
|
|
|
data = {} |
|
|
if config: |
|
|
data['config'] = json.dumps(config) |
|
|
|
|
|
|
|
|
response = await self.client.post( |
|
|
"/api/v1/document/process", |
|
|
data=data, |
|
|
files=files |
|
|
) |
|
|
|
|
|
return ProcessedDocument(**response['document']) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Document processing failed: {str(e)}") |
|
|
raise |
|
|
finally: |
|
|
if 'form' in locals(): |
|
|
form.close() |
|
|
|
|
|
async def batch_process( |
|
|
self, |
|
|
files: List[Union[str, Path, BinaryIO]] |
|
|
) -> Dict[str, ProcessedDocument]: |
|
|
"""Batch process documents""" |
|
|
try: |
|
|
upload_files = [] |
|
|
temp_files = [] |
|
|
|
|
|
|
|
|
for file in files: |
|
|
if isinstance(file, (str, Path)): |
|
|
file_path = Path(file) |
|
|
async with aiofiles.open(file_path, 'rb') as f: |
|
|
file_content = await f.read() |
|
|
filename = file_path.name |
|
|
else: |
|
|
file_content = file.read() |
|
|
filename = getattr(file, 'name', f'document_{len(upload_files)}') |
|
|
|
|
|
|
|
|
temp_file = aiofiles.tempfile.SpooledTemporaryFile() |
|
|
temp_file.write(file_content) |
|
|
temp_file.seek(0) |
|
|
temp_files.append(temp_file) |
|
|
|
|
|
upload_files.append( |
|
|
('files', (filename, temp_file, 'application/octet-stream')) |
|
|
) |
|
|
|
|
|
|
|
|
response = await self.client.post( |
|
|
"/api/v1/document/batch", |
|
|
files=upload_files |
|
|
) |
|
|
|
|
|
|
|
|
return { |
|
|
filename: ProcessedDocument(**doc['document']) |
|
|
for filename, doc in response.items() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Batch processing failed: {str(e)}") |
|
|
raise |
|
|
finally: |
|
|
|
|
|
for temp_file in temp_files: |
|
|
temp_file.close() |
|
|
|
|
|
async def get_supported_types(self) -> Dict[str, List[str]]: |
|
|
"""Get supported document types""" |
|
|
try: |
|
|
response = await self.client.get("/api/v1/document/supported-types") |
|
|
return response['supported_types'] |
|
|
except Exception as e: |
|
|
self.logger.error(f"Failed to get supported types: {str(e)}") |
|
|
raise |
|
|
|
|
|
async def validate_config(self, config: Dict[str, Any]) -> bool: |
|
|
"""Validate document processing configuration""" |
|
|
try: |
|
|
response = await self.client.get( |
|
|
"/api/v1/document/config/validate", |
|
|
params={"config": json.dumps(config)} |
|
|
) |
|
|
return response['valid'] |
|
|
except Exception as e: |
|
|
self.logger.error(f"Config validation failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
async def health_check(self) -> Dict[str, Any]: |
|
|
"""Check document processor health""" |
|
|
try: |
|
|
return await self.client.get("/api/v1/document/health") |
|
|
except Exception as e: |
|
|
self.logger.error(f"Health check failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
|
|
|
async def example_usage(): |
|
|
|
|
|
client = LatticeClient(api_key="your-api-key") |
|
|
|
|
|
|
|
|
config = { |
|
|
"extract_text": True, |
|
|
"extract_metadata": True, |
|
|
"chunk_size": 500, |
|
|
"chunk_overlap": 50 |
|
|
} |
|
|
|
|
|
|
|
|
doc_result = await client.document.process_document( |
|
|
"example.pdf", |
|
|
config=config |
|
|
) |
|
|
|
|
|
print(f"Processed document: {doc_result.doc_id}") |
|
|
print(f"Number of chunks: {len(doc_result.chunks)}") |
|
|
|
|
|
|
|
|
files = ["doc1.pdf", "doc2.docx", "doc3.txt"] |
|
|
batch_results = await client.document.batch_process(files) |
|
|
|
|
|
for filename, result in batch_results.items(): |
|
|
print(f"{filename}: {len(result.chunks)} chunks") |
|
|
|
|
|
|
|
|
supported_types = await client.document.get_supported_types() |
|
|
print(f"Supported types: {supported_types}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import asyncio |
|
|
asyncio.run(example_usage()) |