open-notebook / api /sources_service.py
baveshraam's picture
FIX: SurrealDB 2.0 migration syntax and Frontend/CORS link
f871fed
"""
Sources service layer using API.
"""
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
from loguru import logger
from api.client import api_client
from open_notebook.domain.notebook import Asset, Source
@dataclass
class SourceProcessingResult:
"""Result of source creation with optional async processing info."""
source: Source
is_async: bool = False
command_id: Optional[str] = None
status: Optional[str] = None
processing_info: Optional[Dict] = None
@dataclass
class SourceWithMetadata:
"""Source object with additional metadata from API."""
source: Source
embedded_chunks: int
# Expose common source properties for easy access
@property
def id(self):
return self.source.id
@property
def title(self):
return self.source.title
@title.setter
def title(self, value):
self.source.title = value
@property
def topics(self):
return self.source.topics
@property
def asset(self):
return self.source.asset
@property
def full_text(self):
return self.source.full_text
@property
def created(self):
return self.source.created
@property
def updated(self):
return self.source.updated
class SourcesService:
"""Service layer for sources operations using API."""
def __init__(self):
logger.info("Using API for sources operations")
def get_all_sources(self, notebook_id: Optional[str] = None) -> List[SourceWithMetadata]:
"""Get all sources with optional notebook filtering."""
sources_data = api_client.get_sources(notebook_id=notebook_id)
# Convert API response to SourceWithMetadata objects
sources = []
for source_data in sources_data:
source = Source(
title=source_data["title"],
topics=source_data["topics"],
asset=Asset(
file_path=source_data["asset"]["file_path"]
if source_data["asset"]
else None,
url=source_data["asset"]["url"] if source_data["asset"] else None,
)
if source_data["asset"]
else None,
)
source.id = source_data["id"]
source.created = source_data["created"]
source.updated = source_data["updated"]
# Wrap in SourceWithMetadata
source_with_metadata = SourceWithMetadata(
source=source,
embedded_chunks=source_data.get("embedded_chunks", 0)
)
sources.append(source_with_metadata)
return sources
def get_source(self, source_id: str) -> SourceWithMetadata:
"""Get a specific source."""
response = api_client.get_source(source_id)
source_data = response if isinstance(response, dict) else response[0]
source = Source(
title=source_data["title"],
topics=source_data["topics"],
full_text=source_data["full_text"],
asset=Asset(
file_path=source_data["asset"]["file_path"]
if source_data["asset"]
else None,
url=source_data["asset"]["url"] if source_data["asset"] else None,
)
if source_data["asset"]
else None,
)
source.id = source_data["id"]
source.created = source_data["created"]
source.updated = source_data["updated"]
return SourceWithMetadata(
source=source,
embedded_chunks=source_data.get("embedded_chunks", 0)
)
def create_source(
self,
notebook_id: Optional[str] = None,
source_type: str = "text",
url: Optional[str] = None,
file_path: Optional[str] = None,
content: Optional[str] = None,
title: Optional[str] = None,
transformations: Optional[List[str]] = None,
embed: bool = False,
delete_source: bool = False,
notebooks: Optional[List[str]] = None,
async_processing: bool = False,
) -> Union[Source, SourceProcessingResult]:
"""
Create a new source with support for async processing.
Args:
notebook_id: Single notebook ID (deprecated, use notebooks parameter)
source_type: Type of source (link, upload, text)
url: URL for link sources
file_path: File path for upload sources
content: Text content for text sources
title: Optional source title
transformations: List of transformation IDs to apply
embed: Whether to embed content for vector search
delete_source: Whether to delete uploaded file after processing
notebooks: List of notebook IDs to add source to (preferred over notebook_id)
async_processing: Whether to process source asynchronously
Returns:
Source object for sync processing (backward compatibility)
SourceProcessingResult for async processing (contains additional metadata)
"""
source_data = api_client.create_source(
notebook_id=notebook_id,
notebooks=notebooks,
source_type=source_type,
url=url,
file_path=file_path,
content=content,
title=title,
transformations=transformations,
embed=embed,
delete_source=delete_source,
async_processing=async_processing,
)
# Create Source object from response
response_data = source_data if isinstance(source_data, dict) else source_data[0]
source = Source(
title=response_data["title"],
topics=response_data.get("topics") or [],
full_text=response_data.get("full_text"),
asset=Asset(
file_path=response_data["asset"]["file_path"]
if response_data.get("asset")
else None,
url=response_data["asset"]["url"]
if response_data.get("asset")
else None,
)
if response_data.get("asset")
else None,
)
source.id = response_data["id"]
source.created = response_data["created"]
source.updated = response_data["updated"]
# Check if this is an async processing response
if response_data.get("command_id") or response_data.get("status") or response_data.get("processing_info"):
# Ensure source_data is a dict for accessing attributes
source_data_dict = source_data if isinstance(source_data, dict) else source_data[0]
# Return enhanced result for async processing
return SourceProcessingResult(
source=source,
is_async=True,
command_id=source_data_dict.get("command_id"),
status=source_data_dict.get("status"),
processing_info=source_data_dict.get("processing_info"),
)
else:
# Return simple Source for backward compatibility
return source
def get_source_status(self, source_id: str) -> Dict:
"""Get processing status for a source."""
response = api_client.get_source_status(source_id)
return response if isinstance(response, dict) else response[0]
def create_source_async(
self,
notebook_id: Optional[str] = None,
source_type: str = "text",
url: Optional[str] = None,
file_path: Optional[str] = None,
content: Optional[str] = None,
title: Optional[str] = None,
transformations: Optional[List[str]] = None,
embed: bool = False,
delete_source: bool = False,
notebooks: Optional[List[str]] = None,
) -> SourceProcessingResult:
"""
Create a new source with async processing enabled.
This is a convenience method that always uses async processing.
Returns a SourceProcessingResult with processing status information.
"""
result = self.create_source(
notebook_id=notebook_id,
notebooks=notebooks,
source_type=source_type,
url=url,
file_path=file_path,
content=content,
title=title,
transformations=transformations,
embed=embed,
delete_source=delete_source,
async_processing=True,
)
# Since we forced async_processing=True, this should always be a SourceProcessingResult
if isinstance(result, SourceProcessingResult):
return result
else:
# Fallback: wrap Source in SourceProcessingResult
return SourceProcessingResult(
source=result,
is_async=False, # This shouldn't happen, but handle it gracefully
)
def is_source_processing_complete(self, source_id: str) -> bool:
"""
Check if a source's async processing is complete.
Returns True if processing is complete (success or failure),
False if still processing or queued.
"""
try:
status_data = self.get_source_status(source_id)
status = status_data.get("status")
return status in ["completed", "failed", None] # None indicates legacy/sync source
except Exception as e:
logger.error(f"Error checking source processing status: {e}")
return True # Assume complete on error
def update_source(self, source: Source) -> Source:
"""Update a source."""
if not source.id:
raise ValueError("Source ID is required for update")
updates = {
"title": source.title,
"topics": source.topics,
}
source_data = api_client.update_source(source.id, **updates)
# Ensure source_data is a dict
source_data_dict = source_data if isinstance(source_data, dict) else source_data[0]
# Update the source object with the response
source.title = source_data_dict["title"]
source.topics = source_data_dict["topics"]
source.updated = source_data_dict["updated"]
return source
def delete_source(self, source_id: str) -> bool:
"""Delete a source."""
api_client.delete_source(source_id)
return True
# Global service instance
sources_service = SourcesService()
# Export important classes for easy importing
__all__ = ["SourcesService", "SourceWithMetadata", "SourceProcessingResult", "sources_service"]