""" Sources service layer using API. """ from dataclasses import dataclass from typing import Dict, List, Optional, Union from loguru import logger from api.client import api_client from open_notebook.domain.notebook import Asset, Source @dataclass class SourceProcessingResult: """Result of source creation with optional async processing info.""" source: Source is_async: bool = False command_id: Optional[str] = None status: Optional[str] = None processing_info: Optional[Dict] = None @dataclass class SourceWithMetadata: """Source object with additional metadata from API.""" source: Source embedded_chunks: int # Expose common source properties for easy access @property def id(self): return self.source.id @property def title(self): return self.source.title @title.setter def title(self, value): self.source.title = value @property def topics(self): return self.source.topics @property def asset(self): return self.source.asset @property def full_text(self): return self.source.full_text @property def created(self): return self.source.created @property def updated(self): return self.source.updated class SourcesService: """Service layer for sources operations using API.""" def __init__(self): logger.info("Using API for sources operations") def get_all_sources(self, notebook_id: Optional[str] = None) -> List[SourceWithMetadata]: """Get all sources with optional notebook filtering.""" sources_data = api_client.get_sources(notebook_id=notebook_id) # Convert API response to SourceWithMetadata objects sources = [] for source_data in sources_data: source = Source( title=source_data["title"], topics=source_data["topics"], asset=Asset( file_path=source_data["asset"]["file_path"] if source_data["asset"] else None, url=source_data["asset"]["url"] if source_data["asset"] else None, ) if source_data["asset"] else None, ) source.id = source_data["id"] source.created = source_data["created"] source.updated = source_data["updated"] # Wrap in SourceWithMetadata source_with_metadata = SourceWithMetadata( source=source, embedded_chunks=source_data.get("embedded_chunks", 0) ) sources.append(source_with_metadata) return sources def get_source(self, source_id: str) -> SourceWithMetadata: """Get a specific source.""" response = api_client.get_source(source_id) source_data = response if isinstance(response, dict) else response[0] source = Source( title=source_data["title"], topics=source_data["topics"], full_text=source_data["full_text"], asset=Asset( file_path=source_data["asset"]["file_path"] if source_data["asset"] else None, url=source_data["asset"]["url"] if source_data["asset"] else None, ) if source_data["asset"] else None, ) source.id = source_data["id"] source.created = source_data["created"] source.updated = source_data["updated"] return SourceWithMetadata( source=source, embedded_chunks=source_data.get("embedded_chunks", 0) ) def create_source( self, notebook_id: Optional[str] = None, source_type: str = "text", url: Optional[str] = None, file_path: Optional[str] = None, content: Optional[str] = None, title: Optional[str] = None, transformations: Optional[List[str]] = None, embed: bool = False, delete_source: bool = False, notebooks: Optional[List[str]] = None, async_processing: bool = False, ) -> Union[Source, SourceProcessingResult]: """ Create a new source with support for async processing. Args: notebook_id: Single notebook ID (deprecated, use notebooks parameter) source_type: Type of source (link, upload, text) url: URL for link sources file_path: File path for upload sources content: Text content for text sources title: Optional source title transformations: List of transformation IDs to apply embed: Whether to embed content for vector search delete_source: Whether to delete uploaded file after processing notebooks: List of notebook IDs to add source to (preferred over notebook_id) async_processing: Whether to process source asynchronously Returns: Source object for sync processing (backward compatibility) SourceProcessingResult for async processing (contains additional metadata) """ source_data = api_client.create_source( notebook_id=notebook_id, notebooks=notebooks, source_type=source_type, url=url, file_path=file_path, content=content, title=title, transformations=transformations, embed=embed, delete_source=delete_source, async_processing=async_processing, ) # Create Source object from response response_data = source_data if isinstance(source_data, dict) else source_data[0] source = Source( title=response_data["title"], topics=response_data.get("topics") or [], full_text=response_data.get("full_text"), asset=Asset( file_path=response_data["asset"]["file_path"] if response_data.get("asset") else None, url=response_data["asset"]["url"] if response_data.get("asset") else None, ) if response_data.get("asset") else None, ) source.id = response_data["id"] source.created = response_data["created"] source.updated = response_data["updated"] # Check if this is an async processing response if response_data.get("command_id") or response_data.get("status") or response_data.get("processing_info"): # Ensure source_data is a dict for accessing attributes source_data_dict = source_data if isinstance(source_data, dict) else source_data[0] # Return enhanced result for async processing return SourceProcessingResult( source=source, is_async=True, command_id=source_data_dict.get("command_id"), status=source_data_dict.get("status"), processing_info=source_data_dict.get("processing_info"), ) else: # Return simple Source for backward compatibility return source def get_source_status(self, source_id: str) -> Dict: """Get processing status for a source.""" response = api_client.get_source_status(source_id) return response if isinstance(response, dict) else response[0] def create_source_async( self, notebook_id: Optional[str] = None, source_type: str = "text", url: Optional[str] = None, file_path: Optional[str] = None, content: Optional[str] = None, title: Optional[str] = None, transformations: Optional[List[str]] = None, embed: bool = False, delete_source: bool = False, notebooks: Optional[List[str]] = None, ) -> SourceProcessingResult: """ Create a new source with async processing enabled. This is a convenience method that always uses async processing. Returns a SourceProcessingResult with processing status information. """ result = self.create_source( notebook_id=notebook_id, notebooks=notebooks, source_type=source_type, url=url, file_path=file_path, content=content, title=title, transformations=transformations, embed=embed, delete_source=delete_source, async_processing=True, ) # Since we forced async_processing=True, this should always be a SourceProcessingResult if isinstance(result, SourceProcessingResult): return result else: # Fallback: wrap Source in SourceProcessingResult return SourceProcessingResult( source=result, is_async=False, # This shouldn't happen, but handle it gracefully ) def is_source_processing_complete(self, source_id: str) -> bool: """ Check if a source's async processing is complete. Returns True if processing is complete (success or failure), False if still processing or queued. """ try: status_data = self.get_source_status(source_id) status = status_data.get("status") return status in ["completed", "failed", None] # None indicates legacy/sync source except Exception as e: logger.error(f"Error checking source processing status: {e}") return True # Assume complete on error def update_source(self, source: Source) -> Source: """Update a source.""" if not source.id: raise ValueError("Source ID is required for update") updates = { "title": source.title, "topics": source.topics, } source_data = api_client.update_source(source.id, **updates) # Ensure source_data is a dict source_data_dict = source_data if isinstance(source_data, dict) else source_data[0] # Update the source object with the response source.title = source_data_dict["title"] source.topics = source_data_dict["topics"] source.updated = source_data_dict["updated"] return source def delete_source(self, source_id: str) -> bool: """Delete a source.""" api_client.delete_source(source_id) return True # Global service instance sources_service = SourcesService() # Export important classes for easy importing __all__ = ["SourcesService", "SourceWithMetadata", "SourceProcessingResult", "sources_service"]